Refactor Raft leader election to use multiple threads

Summary:
Implement log replication

Rebase and fix src/CMakeLists.txt

Some style fixes

Changed shared_ptr to unique_ptr for RaftPeerState

Change Id and Leader to const

Move implementation to separate class

Fix raft_experiments.cpp

Reviewers: mislav.bradac

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1033
This commit is contained in:
Marin Tomic 2017-12-07 09:20:11 +01:00
parent 381faf9dd4
commit 0bf692f8ec
9 changed files with 1731 additions and 531 deletions

View File

@ -7,7 +7,6 @@ set(memgraph_src_files
communication/messaging/distributed.cpp
communication/messaging/local.cpp
communication/messaging/protocol.cpp
communication/raft/raft.cpp
communication/reactor/protocol.cpp
communication/reactor/reactor_distributed.cpp
communication/reactor/reactor_local.cpp

View File

@ -0,0 +1,642 @@
#pragma once
#include "fmt/format.h"
#include "glog/logging.h"
namespace communication::raft {
namespace impl {
template <class State>
RaftMemberImpl<State>::RaftMemberImpl(RaftNetworkInterface<State> &network,
RaftStorageInterface<State> &storage,
const MemberId &id,
const RaftConfig &config)
: network_(network), storage_(storage), id_(id), config_(config) {
std::lock_guard<std::mutex> lock(mutex_);
tie(term_, voted_for_) = storage_.GetTermAndVotedFor();
for (const auto &peer_id : config_.members) {
peer_states_[peer_id] = std::make_unique<RaftPeerState>();
}
SetElectionTimer();
}
template <class State>
RaftMemberImpl<State>::~RaftMemberImpl() {
Stop();
}
template <class State>
void RaftMemberImpl<State>::Stop() {
{
std::lock_guard<std::mutex> lock(mutex_);
if (!exiting_) {
LogInfo("Stopping...");
exiting_ = true;
}
}
state_changed_.notify_all();
}
template <class State>
template <class... Args>
void RaftMemberImpl<State>::LogInfo(const std::string &format,
Args &&... args) {
LOG(INFO) << fmt::format("[id = {}, term = {}] {}", id_, term_,
fmt::format(format, std::forward<Args>(args)...))
<< std::endl;
}
template <class State>
void RaftMemberImpl<State>::TimerThreadMain() {
std::unique_lock<std::mutex> lock(mutex_);
while (!exiting_) {
if (Clock::now() >= next_election_time_) {
StartNewElection();
}
state_changed_.wait_until(lock, next_election_time_);
}
}
template <class State>
void RaftMemberImpl<State>::PeerThreadMain(std::string peer_id) {
RaftPeerState &peer_state = *peer_states_[peer_id];
LogInfo("Peer thread started for {}", peer_id);
std::unique_lock<std::mutex> lock(mutex_);
/* This loop will either call a function that issues an RPC or wait on the
* condition variable. It must not do both! Lock on `mutex_` is released while
* waiting for RPC response, which might cause us to miss a notification on
* `state_changed_` conditional variable and wait indefinitely. The safest
* thing to do is to assume some important part of state was modified while we
* were waiting for the response and loop around to check. */
while (!exiting_) {
TimePoint wait_until = TimePoint::max();
switch (mode_) {
case RaftMode::FOLLOWER:
break;
case RaftMode::CANDIDATE:
if (!peer_state.request_vote_done) {
RequestVote(peer_id, peer_state, lock);
continue;
}
break;
case RaftMode::LEADER:
if (peer_state.next_index <= storage_.GetLastLogIndex() ||
Clock::now() >= peer_state.next_heartbeat_time) {
AppendEntries(peer_id, peer_state, lock);
continue;
} else {
wait_until = peer_state.next_heartbeat_time;
}
break;
}
state_changed_.wait_until(lock, wait_until);
}
LogInfo("Peer thread exiting for {}", peer_id);
}
template <class State>
void RaftMemberImpl<State>::CandidateOrLeaderTransitionToFollower() {
DCHECK(mode_ != RaftMode::FOLLOWER)
<< "`CandidateOrLeaderTransitionToFollower` called from follower mode";
mode_ = RaftMode::FOLLOWER;
leader_ = {};
SetElectionTimer();
}
template <class State>
void RaftMemberImpl<State>::CandidateTransitionToLeader() {
DCHECK(mode_ == RaftMode::CANDIDATE)
<< "`CandidateTransitionToLeader` called while not in candidate mode";
mode_ = RaftMode::LEADER;
leader_ = id_;
/* We don't want to trigger elections while in leader mode. */
next_election_time_ = TimePoint::max();
/* [Raft thesis, Section 6.4]
* "The Leader Completeness Property guarantees that a leader has all
* committed entries, but at the start of its term, it may not know which
* those are. To find out, it needs to commit an entry from its term. Raft
* handles this by having each leader commit a blank no-op entry into the log
* at the start of its term. As soon as this no-op entry is committed, the
* leaders commit index will be at least as large as any other servers
* during its term." */
LogEntry<State> entry;
entry.term = term_;
entry.command = std::experimental::nullopt;
storage_.AppendLogEntry(entry);
}
template <class State>
void RaftMemberImpl<State>::UpdateTermAndVotedFor(
const TermId new_term,
const std::experimental::optional<MemberId> &new_voted_for) {
term_ = new_term;
voted_for_ = new_voted_for;
leader_ = {};
storage_.WriteTermAndVotedFor(term_, voted_for_);
}
template <class State>
void RaftMemberImpl<State>::SetElectionTimer() {
/* [Raft thesis, section 3.4]
* "Raft uses randomized election timeouts to ensure that split votes are rare
* and that they are resolved quickly. To prevent split votes in the first
* place, election timeouts are chosen randomly from a fixed interval (e.g.,
* 150300 ms)." */
std::uniform_int_distribution<uint64_t> distribution(
config_.leader_timeout_min.count(), config_.leader_timeout_max.count());
Clock::duration wait_interval = std::chrono::milliseconds(distribution(rng_));
next_election_time_ = Clock::now() + wait_interval;
}
template <class State>
bool RaftMemberImpl<State>::SendRPC(const std::string &recipient,
const PeerRPCRequest<State> &request,
PeerRPCReply &reply,
std::unique_lock<std::mutex> &lock) {
DCHECK(mode_ != RaftMode::FOLLOWER) << "Follower should not send RPCs";
bool was_candidate = mode_ == RaftMode::CANDIDATE;
/* Release lock before issuing RPC and waiting for response. */
/* TODO(mtomic): Revise how this will work with RPC cancellation. */
lock.unlock();
bool ok = network_.SendRPC(recipient, request, reply);
lock.lock();
/* TODO(mtomic): RPC retrying */
if (!ok) {
return false;
}
/* We released the lock while waiting for RPC response. It is possible that
* the internal state has changed while we we're waiting and we don't care for
* RPC reply anymore for any of these reasons:
* (a) we are not the leader anymore
* (b) election timeout
* (c) we are elected as leader
* (d) out election was interrupted by another leader
* (e) destructor was called
*/
if (term_ != request.Term() ||
(was_candidate && mode_ != RaftMode::CANDIDATE) || exiting_) {
LogInfo("Ignoring RPC reply from {}", recipient);
return false;
}
/* [Raft thesis, Section 3.3]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (reply.Term() > term_) {
UpdateTermAndVotedFor(reply.Term(), {});
CandidateOrLeaderTransitionToFollower();
state_changed_.notify_all();
return false;
}
return true;
}
template <class State>
void RaftMemberImpl<State>::StartNewElection() {
LogInfo("Starting new election");
/* [Raft thesis, section 3.4]
* "To begin an election, a follower increments its current term and
* transitions to candidate state. It then votes for itself and issues
* RequestVote RPCs in parallel to each of the other servers in the cluster."
*/
UpdateTermAndVotedFor(term_ + 1, id_);
mode_ = RaftMode::CANDIDATE;
/* [Raft thesis, section 3.4]
* "Each candidate restarts its randomized election timeout at the start of an
* election, and it waits for that timeout to elapse before starting the next
* election; this reduces the likelihood of another split vote in the new
* election." */
SetElectionTimer();
for (const auto &peer_id : config_.members) {
if (peer_id == id_) {
continue;
}
auto &peer_state = peer_states_[peer_id];
peer_state->request_vote_done = false;
peer_state->voted_for_me = false;
peer_state->match_index = 0;
peer_state->next_index = storage_.GetLastLogIndex() + 1;
/* [Raft thesis, section 3.5]
* "Until the leader has discovered where it and the follower's logs match,
* the leader can send AppendEntries with no entries (like heartbeats) to
* save bandwidth. Then, once the matchIndex immediately precedes the
* nextIndex, the leader should begin to send the actual entries." */
peer_state->suppress_log_entries = true;
/* [Raft thesis, section 3.4]
* "Once a candidate wins an election, it becomes leader. It then sends
* heartbeat messages to all of the other servers to establish its authority
* and prevent new elections."
*
* This will make newly elected leader send heartbeats immediately.
*/
peer_state->next_heartbeat_time = TimePoint::min();
}
/* Notify peer threads to start issuing RequestVote RPCs. */
state_changed_.notify_all();
}
template <class State>
bool RaftMemberImpl<State>::CountVotes() {
DCHECK(mode_ == RaftMode::CANDIDATE)
<< "`CountVotes` should only be called from candidate mode";
int num_votes = 0;
for (const auto &peer_id : config_.members) {
if (peer_id == id_ || peer_states_[peer_id]->voted_for_me) {
num_votes++;
}
}
return 2 * num_votes > config_.members.size();
}
template <class State>
void RaftMemberImpl<State>::RequestVote(const std::string &peer_id,
RaftPeerState &peer_state,
std::unique_lock<std::mutex> &lock) {
LogInfo("Requesting vote from {}", peer_id);
PeerRPCRequest<State> request;
request.type = PeerRPCRequest<State>::Type::REQUEST_VOTE;
request.request_vote.candidate_term = term_;
request.request_vote.candidate_id = id_;
PeerRPCReply reply;
/* Release lock before issuing RPC and waiting for response. */
if (!SendRPC(peer_id, request, reply, lock)) {
return;
}
DCHECK(reply.request_vote.term >= term_) << "Stale RequestVote RPC reply";
peer_state.request_vote_done = true;
if (reply.request_vote.vote_granted) {
peer_state.voted_for_me = true;
LogInfo("Got vote from {}", peer_id);
if (CountVotes()) {
LogInfo("Elected as leader.");
CandidateTransitionToLeader();
}
} else {
LogInfo("Vote denied from {}", peer_id);
}
state_changed_.notify_all();
}
template <class State>
void RaftMemberImpl<State>::AdvanceCommitIndex() {
DCHECK(mode_ == RaftMode::LEADER)
<< "`AdvanceCommitIndex` can only be called from leader mode";
std::vector<LogIndex> match_indices;
for (const auto &peer : peer_states_) {
match_indices.push_back(peer.second->match_index);
}
match_indices.push_back(storage_.GetLastLogIndex());
sort(match_indices.begin(), match_indices.end(), std::greater<LogIndex>());
LogIndex new_commit_index_ = match_indices[(config_.members.size() - 1) / 2];
LogInfo("Trying to advance commit index {} to {}", commit_index_,
new_commit_index_);
/* This can happen because we reset `match_index` to 0 for every peer when
* elected. */
if (commit_index_ >= new_commit_index_) {
return;
}
/* [Raft thesis, section 3.6.2]
* (...) Raft never commits log entries from previous terms by counting
* replicas. Only log entries from the leader's current term are committed by
* counting replicas; once an entry from the current term has been committed
* in this way, then all prior entries are committed indirectly because of the
* Log Matching Property." */
if (storage_.GetLogTerm(new_commit_index_) != term_) {
LogInfo("Cannot commit log entry from previous term");
return;
}
commit_index_ = std::max(commit_index_, new_commit_index_);
}
template <class State>
void RaftMemberImpl<State>::AppendEntries(const std::string &peer_id,
RaftPeerState &peer_state,
std::unique_lock<std::mutex> &lock) {
LogInfo("Appending entries to {}", peer_id);
PeerRPCRequest<State> request;
request.type = PeerRPCRequest<State>::Type::APPEND_ENTRIES;
request.append_entries.leader_term = term_;
request.append_entries.leader_id = id_;
request.append_entries.prev_log_index = peer_state.next_index - 1;
request.append_entries.prev_log_term =
storage_.GetLogTerm(peer_state.next_index - 1);
if (!peer_state.suppress_log_entries &&
peer_state.next_index <= storage_.GetLastLogIndex()) {
request.append_entries.entries =
storage_.GetLogSuffix(peer_state.next_index);
} else {
request.append_entries.entries = {};
}
request.append_entries.leader_commit = commit_index_;
PeerRPCReply reply;
if (!SendRPC(peer_id, request, reply, lock)) {
/* There is probably something wrong with this peer, let's avoid sending log
* entries. */
peer_state.suppress_log_entries = true;
return;
}
DCHECK(mode_ == RaftMode::LEADER)
<< "Elected leader for term should never change";
DCHECK(reply.append_entries.term == term_) << "Got stale AppendEntries reply";
if (reply.append_entries.success) {
/* We've found a match, we can start sending log entries. */
peer_state.suppress_log_entries = false;
LogIndex new_match_index = request.append_entries.prev_log_index +
request.append_entries.entries.size();
DCHECK(peer_state.match_index <= new_match_index)
<< "`match_index` should increase monotonically within a term";
peer_state.match_index = new_match_index;
AdvanceCommitIndex();
peer_state.next_index = peer_state.match_index + 1;
peer_state.next_heartbeat_time = Clock::now() + config_.heartbeat_interval;
} else {
DCHECK(peer_state.next_index > 1)
<< "Log replication should not fail for first log entry.";
--peer_state.next_index;
}
state_changed_.notify_all();
}
template <class State>
PeerRPCReply::RequestVote RaftMemberImpl<State>::OnRequestVote(
const typename PeerRPCRequest<State>::RequestVote &request) {
std::lock_guard<std::mutex> lock(mutex_);
LogInfo("RequestVote RPC request from {}", request.candidate_id);
PeerRPCReply::RequestVote reply;
/* [Raft thesis, Section 3.3]
* "If a server receives a request with a stale term number, it rejects the
* request." */
if (request.candidate_term < term_) {
reply.term = term_;
reply.vote_granted = false;
return reply;
}
/* [Raft thesis, Section 3.6.1]
* "Raft uses the voting process to prevent a candidate from winning an
* election unless its log contains all committed entries. (...) The
* RequestVote RPC implements this restriction: the RPC includes information
* about the candidate's log, and the voter denies its vote if its own log is
* more up-to-date than that of the candidate. Raft determines which of two
* logs is more up-to-date by comparing the index and term of the last entries
* in the logs. If the logs have last entries with different terms, then the
* log with the later term is more up-to-date. If the logs end with the same
* term, then whichever log is longer is more up-to-date." */
LogIndex my_last_log_index = storage_.GetLastLogIndex();
TermId my_last_log_term = storage_.GetLogTerm(my_last_log_index);
if (my_last_log_term > request.last_log_term ||
(my_last_log_term == request.last_log_term &&
my_last_log_index > request.last_log_index)) {
reply.term = term_;
reply.vote_granted = false;
return reply;
}
/* [Raft thesis, Section 3.4]
* "Each server will vote for at most one candidate in a given term, on a
* firstcome-first-served basis."
*/
/* We voted for someone else in this term. */
if (request.candidate_term == term_ && voted_for_ &&
*voted_for_ != request.candidate_id) {
reply.term = term_;
reply.vote_granted = false;
return reply;
}
/* [Raft thesis, Section 3.3]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (request.candidate_term > term_) {
if (mode_ != RaftMode::FOLLOWER) {
CandidateOrLeaderTransitionToFollower();
}
}
/* Now we now we will vote for this candidate, because it's term is at least
* as big as ours and we haven't voted for anyone else. */
UpdateTermAndVotedFor(request.candidate_term, request.candidate_id);
/* [Raft thesis, Section 3.4]
* A server remains in follower state as long as it receives valid RPCs from a
* leader or candidate. */
SetElectionTimer();
state_changed_.notify_all();
reply.term = request.candidate_term;
reply.vote_granted = true;
return reply;
}
template <class State>
PeerRPCReply::AppendEntries RaftMemberImpl<State>::OnAppendEntries(
const typename PeerRPCRequest<State>::AppendEntries &request) {
std::lock_guard<std::mutex> lock(mutex_);
LogInfo("AppendEntries RPC request from {}", request.leader_id);
PeerRPCReply::AppendEntries reply;
/* [Raft thesis, Section 3.3]
* "If a server receives a request with a stale term number, it rejects the
* request." */
if (request.leader_term < term_) {
reply.term = term_;
reply.success = false;
return reply;
}
/* [Raft thesis, Section 3.3]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (request.leader_term > term_) {
if (mode_ != RaftMode::FOLLOWER) {
CandidateOrLeaderTransitionToFollower();
}
UpdateTermAndVotedFor(request.leader_term, {});
}
/* [Raft thesis, Section 3.4]
* "While waiting for votes, a candidate may receive an AppendEntries RPC from
* another server claiming to be leader. If the leader's term (included in its
* RPC) is at least as large as the candidate's current term, then the
* candidate recognizes the leader as legitimate and returns to follower
* state." */
if (mode_ == RaftMode::CANDIDATE && request.leader_term == term_) {
CandidateOrLeaderTransitionToFollower();
}
DCHECK(mode_ != RaftMode::LEADER)
<< "Leader cannot accept `AppendEntries` RPC";
DCHECK(term_ == request.leader_term) << "Term should be equal to request "
"term when accepting `AppendEntries` "
"RPC";
leader_ = request.leader_id;
/* [Raft thesis, Section 3.4]
* A server remains in follower state as long as it receives valid RPCs from a
* leader or candidate. */
SetElectionTimer();
state_changed_.notify_all();
/* [Raft thesis, Section 3.5]
* "When sending an AppendEntries RPC, the leader includes the index and term
* of the entry in its log that immediately precedes the new entries. If the
* follower does not find an entry in its log with the same index and term,
* then it refuses the new entries." */
if (request.prev_log_index > storage_.GetLastLogIndex() ||
storage_.GetLogTerm(request.prev_log_index) != request.prev_log_term) {
reply.term = term_;
reply.success = false;
return reply;
}
/* [Raft thesis, Section 3.5]
* "To bring a follower's log into consistency with its own, the leader must
* find the latest log entry where the two logs agree, delete any entries in
* the follower's log after that point, and send the follower all of the
* leader's entries after that point." */
/* Entry at `request.prev_log_index` is the last entry where ours and leader's
* logs agree. It's time to replace the tail of the log with new entries from
* the leader. We have to be careful here as duplicated AppendEntries RPCs
* could cause data loss.
*
* There is a possibility that an old AppendEntries RPC is duplicated and
* received after processing newer one. For example, leader appends entry 3
* and then entry 4, but follower recieves entry 3, then entry 4, and then
* entry 3 again. We have to be careful not to delete entry 4 from log when
* processing the last RPC. */
LogIndex index = request.prev_log_index;
auto it = request.entries.begin();
for (; it != request.entries.end(); ++it) {
++index;
if (index > storage_.GetLastLogIndex()) {
break;
}
if (storage_.GetLogTerm(index) != it->term) {
LogInfo("Truncating log suffix from index {}", index);
DCHECK(commit_index_ < index)
<< "Committed entries should never be truncated form the log";
storage_.TruncateLogSuffix(index);
break;
}
}
LogInfo("Appending {} out of {} logs from {}.", request.entries.end() - it,
request.entries.size(), request.leader_id);
for (; it != request.entries.end(); ++it) {
storage_.AppendLogEntry(*it);
}
commit_index_ = std::max(commit_index_, request.leader_commit);
/* Let's bump election timer once again, we don't want to take down the leader
* because of our long disk writes. */
SetElectionTimer();
state_changed_.notify_all();
reply.term = term_;
reply.success = true;
return reply;
}
} // namespace impl
template <class State>
RaftMember<State>::RaftMember(RaftNetworkInterface<State> &network,
RaftStorageInterface<State> &storage,
const MemberId &id, const RaftConfig &config)
: impl_(network, storage, id, config) {
timer_thread_ =
std::thread(&impl::RaftMemberImpl<State>::TimerThreadMain, &impl_);
for (const auto &peer_id : config.members) {
if (peer_id != id) {
peer_threads_.emplace_back(&impl::RaftMemberImpl<State>::PeerThreadMain,
&impl_, peer_id);
}
}
}
template <class State>
RaftMember<State>::~RaftMember() {
impl_.Stop();
timer_thread_.join();
for (auto &peer_thread : peer_threads_) {
peer_thread.join();
}
}
template <class State>
PeerRPCReply::RequestVote RaftMember<State>::OnRequestVote(
const typename PeerRPCRequest<State>::RequestVote &request) {
return impl_.OnRequestVote(request);
}
template <class State>
PeerRPCReply::AppendEntries RaftMember<State>::OnAppendEntries(
const typename PeerRPCRequest<State>::AppendEntries &request) {
return impl_.OnAppendEntries(request);
}
} // namespace communication::raft

View File

@ -1,263 +0,0 @@
#include "raft.hpp"
#include <iostream>
#include "fmt/format.h"
#include "glog/logging.h"
#include "communication/raft/raft_network.hpp"
using std::experimental::optional;
using std::placeholders::_1;
using std::placeholders::_2;
using namespace communication::reactor;
using namespace std::chrono_literals;
namespace communication::raft {
RaftMember::RaftMember(System &system, const std::string &id,
const RaftConfig &config, RaftNetworkInterface &network)
: id_(id),
system_(system),
config_(config),
network_(network),
mode_(Mode::FOLLOWER),
leader_watchdog_(config_.leader_timeout_min, config_.leader_timeout_max,
[this]() {
LocalChannelWriter channel(id_, "main", system_);
channel.Send<MLeaderTimeout>();
}),
heartbeat_watchdog_(
config_.heartbeat_interval, config_.heartbeat_interval,
[this]() {
for (const auto &member : config_.members) {
if (id_ != member) {
network_.AppendEntries(member, MAppendEntries(term_, id_));
}
}
},
true),
reactor_(system.Spawn(id, [this](Reactor &r) {
EventStream *stream = r.main_.first;
stream->OnEvent<MLeaderTimeout>([this](
const MLeaderTimeout &, const Subscription &) { RunElection(); });
stream->OnEvent<MRequestVote>(
[this](const MRequestVote &req, const Subscription &) {
network_.RequestVoteReply(req.sender_id, OnRequestVote(req));
});
stream->OnEvent<MRequestVoteReply>(
[this](const MRequestVoteReply &req, const Subscription &) {
OnRequestVoteReply(req);
});
stream->OnEvent<MAppendEntries>(
[this](const MAppendEntries &req, const Subscription &) {
network_.AppendEntriesReply(req.sender_id, OnAppendEntries(req));
});
stream->OnEvent<MAppendEntriesReply>(
[this](const MAppendEntriesReply &rep, const Subscription &) {
OnAppendEntriesReply(rep);
});
})) {}
RaftMember::~RaftMember() { LogInfo("Shutting down..."); }
template <class... Args>
void RaftMember::LogInfo(const std::string &format, Args &&... args) {
LOG(INFO) << fmt::format("(node = {}, term = {}) ", id_, term_)
<< fmt::format(format, std::forward<Args>(args)...) << std::endl;
}
void RaftMember::TransitionToFollower() {
/* Stop sending heartbeats to followers, start listening for them. */
heartbeat_watchdog_.Block();
leader_watchdog_.Unblock();
mode_ = Mode::FOLLOWER;
}
void RaftMember::TransitionToCandidate() {
/* This transition only happens if we are in follower or candidate mode.
* Leader timeout watchdog is not blocked because it also triggers new
* elections if the current one times out. */
DCHECK(mode_ != Mode::LEADER)
<< "Transition to candidate mode from leader mode.";
mode_ = Mode::CANDIDATE;
votes_ = {};
}
void RaftMember::TransitionToLeader() {
/* This transition only happens if we are in candidate mode. */
DCHECK(mode_ == Mode::CANDIDATE)
<< "Transition to leader mode from leader or follower mode.";
/* Stop listening for leader heartbeats, start sending them. */
leader_watchdog_.Block();
heartbeat_watchdog_.Unblock();
mode_ = Mode::LEADER;
leader_ = id_;
}
void RaftMember::UpdateTerm(int new_term) {
term_ = new_term;
voted_for_ = {};
leader_ = {};
}
void RaftMember::RunElection() {
/* Elections will be skipped if we believe we are the leader. This can happen
* if leader timeout message was delayed for some reason. */
if (mode_ == Mode::LEADER) {
return;
}
LogInfo("Running for leader.");
/* [Raft paper, Section 5.2.]
* "To begin an election, a follower increments its current term and
* transitions to candidate state. It then votes for itself and issues
* RequestVote RPCs in parallel to each of the other servers in
* the cluster." */
TransitionToCandidate();
UpdateTerm(term_ + 1);
voted_for_ = id_;
votes_.insert(id_);
for (const auto &member_id : config_.members) {
if (member_id != id_) {
network_.RequestVote(member_id, MRequestVote(term_, id_));
}
}
}
MRequestVoteReply RaftMember::OnRequestVote(const MRequestVote &req) {
LogInfo("Vote requested from {}, candidate_term = {}", req.sender_id,
req.term);
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (req.term > term_) {
TransitionToFollower();
UpdateTerm(req.term);
}
/* [Raft paper, Figure 2]
* "Reply false if term < currentTerm." */
if (req.term < term_) {
return MRequestVoteReply(term_, id_, false);
}
/* [Raft paper, Figure 2]
* "If votedFor is null or candidateId, and candidate's log is at least as
* up-to-date as receiver's log, grant vote." */
if (voted_for_ && *voted_for_ != req.sender_id) {
return MRequestVoteReply(term_, id_, false);
}
LogInfo("Granting vote to {}.", req.sender_id);
voted_for_ = req.sender_id;
/* [Raft paper, Section 5.2]
* "A server remains in follower state as long as it receives valid RPCs from
* a leader or candidate." */
leader_watchdog_.Notify();
DCHECK(mode_ != Mode::LEADER) << "Granted vote as a leader.";
return MRequestVoteReply(term_, id_, true);
}
void RaftMember::OnRequestVoteReply(const MRequestVoteReply &rep) {
/* Ignore leftover messages from old elections. */
if (mode_ != Mode::CANDIDATE || rep.term < term_) {
return;
}
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (rep.term > term_) {
LogInfo(
"Vote denied from {} with greater term {}, transitioning to follower "
"mode.",
rep.sender_id, rep.term);
TransitionToFollower();
UpdateTerm(rep.term);
return;
}
if (!rep.success) {
LogInfo("Vote rejected from {}.", rep.sender_id);
return;
}
LogInfo("Vote granted from {}.", rep.sender_id);
votes_.insert(rep.sender_id);
if (2 * votes_.size() > config_.members.size()) {
LogInfo("Elected as leader.");
TransitionToLeader();
}
}
MAppendEntriesReply RaftMember::OnAppendEntries(const MAppendEntries &req) {
LogInfo("Append entries from {}.", req.sender_id);
/* [Raft paper, Section 5.1]
* "If a server receives a request with a stale term number, it rejects
* the request." */
if (req.term < term_) {
return MAppendEntriesReply(term_, id_, false);
}
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (req.term > term_) {
TransitionToFollower();
UpdateTerm(req.term);
}
/* [Raft paper, Section 5.2]
* "While waiting for votes, a candidate may receive an AppendEntries RPC from
* another server claiming to be leader. If the leaders term (included in its
* RPC) is at least as large as the candidates current term, then the
* candidate recognizes the leader as legitimate and returns to follower
* state." */
if (req.term == term_ && mode_ == Mode::CANDIDATE) {
TransitionToFollower();
UpdateTerm(req.term);
}
/* [Raft paper, Section 5.2]
* "A server remains in follower state as long as it receives
* valid RPCs from a leader or candidate." */
leader_ = req.sender_id;
leader_watchdog_.Notify();
return MAppendEntriesReply(term_, id_, true);
}
void RaftMember::OnAppendEntriesReply(const MAppendEntriesReply &rep) {
/* [Raft paper, Section 5.1]
* "Current terms are exchanged whenever servers communicate; if one server's
* current term is smaller than the other's, then it updates its current term
* to the larger value. If a candidate or leader discovers that its term is
* out of date, it immediately reverts to follower state." */
if (rep.term > term_) {
TransitionToFollower();
UpdateTerm(rep.term);
}
}
} // namespace communication::raft

View File

@ -1,70 +1,258 @@
#pragma once
#include <chrono>
#include <condition_variable>
#include <experimental/optional>
#include <map>
#include <mutex>
#include <random>
#include <set>
#include <thread>
#include <vector>
#include "communication/reactor/reactor_local.hpp"
#include "utils/watchdog.hpp"
namespace communication::raft {
struct MAppendEntries;
struct MAppendEntriesReply;
struct MRequestVote;
struct MRequestVoteReply;
using Clock = std::chrono::system_clock;
using TimePoint = std::chrono::system_clock::time_point;
class RaftNetworkInterface;
using MemberId = std::string;
using TermId = uint64_t;
using ClientId = uint64_t;
using CommandId = uint64_t;
using LogIndex = uint64_t;
template <class State>
struct LogEntry {
int term;
std::experimental::optional<typename State::Change> command;
bool operator==(const LogEntry &rhs) const {
return term == rhs.term && command == rhs.command;
}
bool operator!=(const LogEntry &rhs) const { return !(*this == rhs); }
};
/* Raft RPC requests and replies as described in [Raft thesis, Figure 3.1]. */
template <class State>
struct PeerRPCRequest {
enum class Type { REQUEST_VOTE, APPEND_ENTRIES };
Type type;
struct RequestVote {
TermId candidate_term;
MemberId candidate_id;
LogIndex last_log_index;
TermId last_log_term;
} request_vote;
struct AppendEntries {
TermId leader_term;
MemberId leader_id;
LogIndex prev_log_index;
TermId prev_log_term;
std::vector<LogEntry<State>> entries;
LogIndex leader_commit;
} append_entries;
TermId Term() const {
switch (type) {
case Type::REQUEST_VOTE:
return request_vote.candidate_term;
case Type::APPEND_ENTRIES:
return append_entries.leader_term;
}
}
};
struct PeerRPCReply {
enum class Type { REQUEST_VOTE, APPEND_ENTRIES };
Type type;
struct RequestVote {
TermId term;
bool vote_granted;
} request_vote;
struct AppendEntries {
TermId term;
bool success;
} append_entries;
TermId Term() const {
switch (type) {
case Type::REQUEST_VOTE:
return request_vote.term;
case Type::APPEND_ENTRIES:
return append_entries.term;
}
}
};
template <class State>
class RaftNetworkInterface {
public:
virtual ~RaftNetworkInterface() = default;
/* Returns false if RPC failed for some reason (e.g. cannot establish
* connection, request timeout or request cancelled). Otherwise `reply`
* contains response from peer. */
virtual bool SendRPC(const MemberId &recipient,
const PeerRPCRequest<State> &request,
PeerRPCReply &reply) = 0;
};
template <class State>
class RaftStorageInterface {
public:
virtual ~RaftStorageInterface() = default;
virtual void WriteTermAndVotedFor(
const TermId term,
const std::experimental::optional<std::string> &voted_for) = 0;
virtual std::pair<TermId, std::experimental::optional<MemberId>>
GetTermAndVotedFor() = 0;
virtual void AppendLogEntry(const LogEntry<State> &entry) = 0;
virtual TermId GetLogTerm(const LogIndex index) = 0;
virtual LogEntry<State> GetLogEntry(const LogIndex index) = 0;
virtual std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) = 0;
virtual LogIndex GetLastLogIndex() = 0;
virtual void TruncateLogSuffix(const LogIndex index) = 0;
};
struct RaftConfig {
std::vector<std::string> members;
std::vector<MemberId> members;
std::chrono::milliseconds leader_timeout_min;
std::chrono::milliseconds leader_timeout_max;
std::chrono::milliseconds heartbeat_interval;
};
class RaftMember {
namespace impl {
enum class RaftMode { FOLLOWER, CANDIDATE, LEADER };
struct RaftPeerState {
bool request_vote_done;
bool voted_for_me;
LogIndex match_index;
LogIndex next_index;
bool suppress_log_entries;
Clock::time_point next_heartbeat_time;
};
template <class State>
class RaftMemberImpl {
public:
RaftMember(communication::reactor::System &system, const std::string &id,
const RaftConfig &config, RaftNetworkInterface &network);
virtual ~RaftMember();
explicit RaftMemberImpl(RaftNetworkInterface<State> &network,
RaftStorageInterface<State> &storage,
const MemberId &id, const RaftConfig &config);
protected:
std::string id_;
std::experimental::optional<std::string> leader_;
~RaftMemberImpl();
private:
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
void Stop();
communication::reactor::System &system_;
RaftConfig config_;
RaftNetworkInterface &network_;
Mode mode_;
uint64_t term_ = 0;
std::experimental::optional<std::string> voted_for_;
void TimerThreadMain();
void PeerThreadMain(std::string peer_id);
std::set<std::string> votes_;
void UpdateTermAndVotedFor(
const TermId new_term,
const std::experimental::optional<MemberId> &new_voted_for);
void CandidateOrLeaderTransitionToFollower();
void CandidateTransitionToLeader();
Watchdog leader_watchdog_;
Watchdog heartbeat_watchdog_;
bool SendRPC(const std::string &recipient,
const PeerRPCRequest<State> &request, PeerRPCReply &reply,
std::unique_lock<std::mutex> &lock);
std::unique_ptr<reactor::Reactor> reactor_;
void StartNewElection();
void SetElectionTimer();
bool CountVotes();
void RequestVote(const MemberId &peer_id, RaftPeerState &peer_state,
std::unique_lock<std::mutex> &lock);
void RunElection();
void TransitionToFollower();
void TransitionToCandidate();
void TransitionToLeader();
void UpdateTerm(int new_term);
void AdvanceCommitIndex();
void AppendEntries(const MemberId &peer_id, RaftPeerState &peer_state,
std::unique_lock<std::mutex> &lock);
MRequestVoteReply OnRequestVote(const MRequestVote &);
void OnRequestVoteReply(const MRequestVoteReply &);
MAppendEntriesReply OnAppendEntries(const MAppendEntries &);
void OnAppendEntriesReply(const MAppendEntriesReply &);
PeerRPCReply::RequestVote OnRequestVote(
const typename PeerRPCRequest<State>::RequestVote &request);
PeerRPCReply::AppendEntries OnAppendEntries(
const typename PeerRPCRequest<State>::AppendEntries &request);
template <class... Args>
void LogInfo(const std::string &, Args &&...);
RaftNetworkInterface<State> &network_;
RaftStorageInterface<State> &storage_;
MemberId id_;
RaftConfig config_;
TermId term_;
RaftMode mode_ = RaftMode::FOLLOWER;
std::experimental::optional<MemberId> voted_for_ = std::experimental::nullopt;
std::experimental::optional<MemberId> leader_ = std::experimental::nullopt;
TimePoint next_election_time_;
LogIndex commit_index_ = 0;
bool exiting_ = false;
std::map<std::string, std::unique_ptr<RaftPeerState>> peer_states_;
/* This mutex protects all of the internal state. */
std::mutex mutex_;
/* Used to notify waiting threads that some of the internal state has changed.
* It is notified when following events occurr:
* - mode change
* - election start
* - `next_election_time_` update on RPC from leader or candidate
* - destructor is called
* - `commit_index_` is advanced
*/
std::condition_variable state_changed_;
std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}());
};
} // namespace internal
template <class State>
class RaftMember final {
public:
explicit RaftMember(RaftNetworkInterface<State> &network,
RaftStorageInterface<State> &storage, const MemberId &id,
const RaftConfig &config);
~RaftMember();
/* Just to make the tests work for now until we clean up the reactor stuff. */
std::experimental::optional<MemberId> Leader() {
std::lock_guard<std::mutex> lock(impl_.mutex_);
return impl_.leader_;
}
MemberId Id() const { return impl_.id_; }
PeerRPCReply::RequestVote OnRequestVote(
const typename PeerRPCRequest<State>::RequestVote &request);
PeerRPCReply::AppendEntries OnAppendEntries(
const typename PeerRPCRequest<State>::AppendEntries &request);
private:
impl::RaftMemberImpl<State> impl_;
/* Timer thread for triggering elections. */
std::thread timer_thread_;
/* One thread per peer for outgoing RPCs. */
std::vector<std::thread> peer_threads_;
};
} // namespace communication::raft
#include "raft-inl.hpp"

View File

@ -1,158 +0,0 @@
#pragma once
#include <experimental/optional>
#include <mutex>
#include "communication/reactor/reactor_local.hpp"
namespace communication::raft {
struct MLeaderTimeout : public communication::reactor::Message {};
struct RaftMessage : public communication::reactor::Message {
RaftMessage(int term, const std::string &sender_id)
: term(term), sender_id(sender_id) {}
int term;
std::string sender_id;
};
struct RaftMessageReply : public RaftMessage {
RaftMessageReply(int term, const std::string &sender_id, bool success)
: RaftMessage(term, sender_id), success(success) {}
std::experimental::optional<std::string> leader;
bool success;
};
struct MRequestVote : public RaftMessage {
MRequestVote(int candidate_term, const std::string &candidate_id)
: RaftMessage(candidate_term, candidate_id) {}
};
struct MRequestVoteReply : public RaftMessageReply {
MRequestVoteReply(int term, const std::string &sender_id, bool vote_granted)
: RaftMessageReply(term, sender_id, vote_granted) {}
};
struct MAppendEntries : public RaftMessage {
MAppendEntries(int term, const std::string &sender_id)
: RaftMessage(term, sender_id) {}
};
struct MAppendEntriesReply : public RaftMessageReply {
MAppendEntriesReply(int term, const std::string &sender_id, bool success)
: RaftMessageReply(term, sender_id, success) {}
};
class RaftNetworkInterface {
public:
virtual ~RaftNetworkInterface() {}
virtual bool RequestVote(const std::string &recipient,
const MRequestVote &msg) = 0;
virtual bool RequestVoteReply(const std::string &recipient,
const MRequestVoteReply &msg) = 0;
virtual bool AppendEntries(const std::string &recipient,
const MAppendEntries &msg) = 0;
virtual bool AppendEntriesReply(const std::string &recipient,
const MAppendEntriesReply &msg) = 0;
};
class LocalReactorNetworkInterface : public RaftNetworkInterface {
public:
explicit LocalReactorNetworkInterface(communication::reactor::System &system)
: system_(system) {}
bool RequestVote(const std::string &recipient,
const MRequestVote &msg) override {
return SendMessage(recipient, msg);
}
bool RequestVoteReply(const std::string &recipient,
const MRequestVoteReply &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntries(const std::string &recipient,
const MAppendEntries &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntriesReply(const std::string &recipient,
const MAppendEntriesReply &msg) override {
return SendMessage(recipient, msg);
}
private:
template <class TMessage>
bool SendMessage(const std::string &recipient, const TMessage &msg) {
reactor::LocalChannelWriter channel(recipient, "main", system_);
channel.Send<TMessage>(msg);
// TODO: We always return true here even though we do not know if message
// was delievered or not. Maybe return value of functions in
// RaftNetworkInterface should be changed to be void, not bool.
return true;
}
communication::reactor::System &system_;
};
class FakeNetworkInterface : public RaftNetworkInterface {
public:
explicit FakeNetworkInterface(communication::reactor::System &system)
: system_(system) {}
bool RequestVote(const std::string &recipient,
const MRequestVote &msg) override {
return SendMessage(recipient, msg);
}
bool RequestVoteReply(const std::string &recipient,
const MRequestVoteReply &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntries(const std::string &recipient,
const MAppendEntries &msg) override {
return SendMessage(recipient, msg);
}
bool AppendEntriesReply(const std::string &recipient,
const MAppendEntriesReply &msg) override {
return SendMessage(recipient, msg);
}
void Disconnect(const std::string &id) {
std::lock_guard<std::mutex> guard(mutex_);
status_[id] = false;
}
void Connect(const std::string &id) {
std::lock_guard<std::mutex> guard(mutex_);
status_[id] = true;
}
private:
template <class TMessage>
bool SendMessage(const std::string &recipient, const TMessage &msg) {
bool ok;
{
std::lock_guard<std::mutex> guard(mutex_);
ok = status_[msg.sender_id] && status_[recipient];
}
if (ok) {
reactor::LocalChannelWriter channel(recipient, "main", system_);
channel.Send<TMessage>(msg);
}
return ok;
}
communication::reactor::System &system_;
std::mutex mutex_;
std::unordered_map<std::string, bool> status_;
};
} // namespace communication::raft

View File

@ -0,0 +1,136 @@
#pragma once
#include <atomic>
#include "communication/raft/raft.hpp"
#include "communication/reactor/reactor_local.cpp"
/* This is junk, hopefully we get rid of reactor leftovers soon. */
namespace communication::raft {
using namespace communication::reactor;
template <class State>
class LocalReactorNetworkInterface : public RaftNetworkInterface<State> {
public:
explicit LocalReactorNetworkInterface(
communication::reactor::System &system, const std::string &id,
const std::function<PeerRPCReply(const PeerRPCRequest<State> &)>
&rpc_callback)
: id_(id),
reactor_(system.Spawn(
id,
[this, rpc_callback](reactor::Reactor &r) {
reactor::EventStream *stream = r.main_.first;
stream->OnEvent<MPeerRPCRequest>([this, rpc_callback, &r](
const MPeerRPCRequest &request,
const reactor::Subscription &) {
if (!connected_) {
return;
}
PeerRPCReply reply = rpc_callback(request.request);
reactor::LocalChannelWriter channel_writer(
request.sender, request.reply_channel, r.system_);
auto channel =
r.system_.Resolve(request.sender, request.reply_channel);
channel_writer.Send<MPeerRPCReply>(reply);
});
})),
connected_(false) {}
~LocalReactorNetworkInterface() {}
void Connect() { connected_ = true; }
void Disconnect() { connected_ = false; }
bool SendRPC(const std::string &recipient,
const PeerRPCRequest<State> &request, PeerRPCReply &reply) {
if (!connected_) {
return false;
}
reactor::LocalChannelWriter request_channel_writer(recipient, "main",
reactor_->system_);
std::string reply_channel_id = fmt::format("{}", request_id_++);
auto reply_channel = reactor_->Open(reply_channel_id).first;
std::atomic<bool> got_reply{false};
reply_channel->template OnEvent<MPeerRPCReply>([&reply, &got_reply](
const MPeerRPCReply &reply_message, const reactor::Subscription &) {
reply = reply_message.reply;
got_reply = true;
});
request_channel_writer.Send<MPeerRPCRequest>(
MPeerRPCRequest(request, id_, reply_channel_id));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
return got_reply;
}
private:
struct MPeerRPCRequest : public communication::reactor::Message {
MPeerRPCRequest(const PeerRPCRequest<State> &request,
const std::string &sender, const std::string &reply_channel)
: request(request), sender(sender), reply_channel(reply_channel) {}
PeerRPCRequest<State> request;
std::string sender;
std::string reply_channel;
};
struct MPeerRPCReply : public communication::reactor::Message {
MPeerRPCReply(const PeerRPCReply &reply) : reply(reply) {}
PeerRPCReply reply;
};
std::string id_;
std::unique_ptr<communication::reactor::Reactor> reactor_;
std::atomic<bool> connected_;
std::atomic<int> request_id_{0};
};
struct MShutdown : public Message {};
template <class State>
class RaftMemberLocalReactor {
public:
explicit RaftMemberLocalReactor(communication::reactor::System &system,
RaftStorageInterface<State> &storage,
const std::string &id,
const RaftConfig &config)
: network_(system, id,
[this](const PeerRPCRequest<State> &request) -> PeerRPCReply {
return this->OnRPC(request);
}),
member_(network_, storage, id, config) {}
void Connect() { network_.Connect(); }
void Disconnect() { network_.Disconnect(); }
virtual ~RaftMemberLocalReactor() {}
private:
LocalReactorNetworkInterface<State> network_;
PeerRPCReply OnRPC(const PeerRPCRequest<State> &request) {
PeerRPCReply reply;
if (request.type == PeerRPCRequest<State>::Type::REQUEST_VOTE) {
reply.type = PeerRPCReply::Type::REQUEST_VOTE;
reply.request_vote = member_.OnRequestVote(request.request_vote);
} else if (request.type == PeerRPCRequest<State>::Type::APPEND_ENTRIES) {
reply.type = PeerRPCReply::Type::APPEND_ENTRIES;
reply.append_entries = member_.OnAppendEntries(request.append_entries);
} else {
LOG(FATAL) << "Unknown Raft RPC request type";
}
return reply;
}
protected:
RaftMember<State> member_;
};
} // namespace communication::raft

View File

@ -0,0 +1,106 @@
#include "communication/raft/raft.hpp"
namespace communication::raft::test_utils {
struct DummyState {
struct Change {
bool operator==(const Change &) const { return true; }
bool operator!=(const Change &) const { return false; }
};
struct Query {};
struct Result {};
};
template <class State>
class NoOpNetworkInterface : public RaftNetworkInterface<State> {
public:
~NoOpNetworkInterface() {}
bool SendRPC(const MemberId &recipient, const PeerRPCRequest<State> &request,
PeerRPCReply &reply) {
return false;
}
};
template <class State>
class NextReplyNetworkInterface : public RaftNetworkInterface<State> {
public:
~NextReplyNetworkInterface() {}
bool SendRPC(const MemberId &recipient, const PeerRPCRequest<State> &request,
PeerRPCReply &reply) {
on_request_(request);
if (!next_reply_) {
return false;
}
reply = *next_reply_;
return true;
}
std::function<void(const PeerRPCRequest<State> &)> on_request_;
std::experimental::optional<PeerRPCReply> next_reply_;
};
template <class State>
class NoOpStorageInterface : public RaftStorageInterface<State> {
public:
NoOpStorageInterface() {}
void WriteTermAndVotedFor(const TermId,
const std::experimental::optional<std::string> &) {}
std::pair<TermId, std::experimental::optional<MemberId>>
GetTermAndVotedFor() {
return {0, {}};
}
void AppendLogEntry(const LogEntry<State> &) {}
TermId GetLogTerm(const LogIndex) { return 0; }
LogEntry<State> GetLogEntry(const LogIndex) { assert(false); }
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex) { return {}; }
LogIndex GetLastLogIndex() { return 0; }
void TruncateLogSuffix(const LogIndex) {}
TermId term_;
std::experimental::optional<MemberId> voted_for_;
std::vector<LogEntry<State>> log_;
};
template <class State>
class InMemoryStorageInterface : public RaftStorageInterface<State> {
public:
InMemoryStorageInterface(
const TermId term,
const std::experimental::optional<std::string> &voted_for,
const std::vector<LogEntry<State>> log)
: term_(term), voted_for_(voted_for), log_(log) {}
void WriteTermAndVotedFor(
const TermId term,
const std::experimental::optional<std::string> &voted_for) {
term_ = term;
voted_for_ = voted_for;
}
std::pair<TermId, std::experimental::optional<MemberId>>
GetTermAndVotedFor() {
return {term_, voted_for_};
}
void AppendLogEntry(const LogEntry<State> &entry) { log_.push_back(entry); }
TermId GetLogTerm(const LogIndex index) {
return index > 0 ? log_[index - 1].term : 0;
}
LogEntry<State> GetLogEntry(const LogIndex index) { return log_[index - 1]; }
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) {
return std::vector<LogEntry<State>>(log_.begin() + index - 1, log_.end());
}
LogIndex GetLastLogIndex(void) { return log_.size(); }
void TruncateLogSuffix(const LogIndex index) {
log_.erase(log_.begin() + index - 1, log_.end());
}
TermId term_;
std::experimental::optional<MemberId> voted_for_;
std::vector<LogEntry<State>> log_;
};
}

View File

@ -3,40 +3,46 @@
#include "fmt/format.h"
#include "glog/logging.h"
#include "communication/raft/raft.hpp"
#include "communication/raft/raft_network.hpp"
#include "communication/raft/raft_reactor.hpp"
#include "communication/raft/test_utils.hpp"
using std::chrono::milliseconds;
using std::experimental::optional;
using namespace communication::raft;
using namespace communication::raft::test_utils;
using namespace communication::reactor;
using namespace std::chrono_literals;
class RaftMemberTest : RaftMember {
template <class State>
class RaftMemberTest : public RaftMemberLocalReactor<State> {
public:
std::string Id() const { return id_; }
optional<std::string> Leader() const { return leader_; }
MemberId Id() { return member_.Id(); }
std::experimental::optional<MemberId> Leader() { return member_.Leader(); }
using RaftMember::RaftMember;
private:
using RaftMemberLocalReactor<State>::RaftMemberLocalReactor;
using RaftMemberLocalReactor<State>::member_;
};
using RaftMemberDummy = RaftMemberTest<DummyState>;
milliseconds InitialElection(const RaftConfig &config) {
System sys;
FakeNetworkInterface network(sys);
NoOpStorageInterface<DummyState> storage;
std::chrono::system_clock::time_point start, end;
LOG(INFO) << "Starting..." << std::endl;
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
std::vector<std::unique_ptr<RaftMemberDummy>> members;
start = std::chrono::system_clock::now();
for (const auto &member_id : config.members) {
members.push_back(
std::make_unique<RaftMemberTest>(sys, member_id, config, network));
network.Connect(member_id);
std::make_unique<RaftMemberDummy>(sys, storage, member_id, config));
members.back()->Connect();
}
bool leader_elected = false;
@ -56,19 +62,19 @@ milliseconds InitialElection(const RaftConfig &config) {
milliseconds Reelection(const RaftConfig &config) {
System sys;
FakeNetworkInterface network(sys);
NoOpStorageInterface<DummyState> storage;
std::chrono::system_clock::time_point start, end;
LOG(INFO) << "Starting..." << std::endl;
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
std::vector<std::unique_ptr<RaftMemberDummy>> members;
for (const auto &member_id : config.members) {
members.push_back(
std::make_unique<RaftMemberTest>(sys, member_id, config, network));
network.Connect(member_id);
std::make_unique<RaftMemberDummy>(sys, storage, member_id, config));
members.back()->Connect();
}
bool leader_elected = false;
@ -87,7 +93,12 @@ milliseconds Reelection(const RaftConfig &config) {
std::this_thread::sleep_for(config.heartbeat_interval);
start = std::chrono::system_clock::now();
network.Disconnect(first_leader);
for (const auto &member : members) {
if (member->Id() == first_leader) {
member->Disconnect();
break;
}
}
leader_elected = false;
do {

View File

@ -1,78 +1,617 @@
#include "gtest/gtest.h"
#include <chrono>
#include <experimental/optional>
#include <thread>
#include "communication/raft/raft.hpp"
#include "communication/raft/raft_network.hpp"
#include "communication/reactor/reactor_local.hpp"
#include "communication/raft/test_utils.hpp"
using namespace std::chrono_literals;
using namespace communication::raft;
using namespace communication::raft::test_utils;
class RaftMemberTest : RaftMember {
using testing::Values;
const RaftConfig test_config2{{"a", "b"}, 150ms, 300ms, 70ms};
const RaftConfig test_config3{{"a", "b", "c"}, 150ms, 300ms, 70ms};
const RaftConfig test_config5{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms};
using communication::raft::impl::RaftMemberImpl;
using communication::raft::impl::RaftMode;
class RaftMemberTest : public ::testing::Test {
public:
std::string Id() { return id_; }
std::string Leader() { return *leader_; }
RaftMemberTest()
: storage_(1, "a", {}), member(network_, storage_, "a", test_config5) {}
using RaftMember::RaftMember;
void SetLog(std::vector<LogEntry<DummyState>> log) {
storage_.log_ = std::move(log);
}
NoOpNetworkInterface<DummyState> network_;
InMemoryStorageInterface<DummyState> storage_;
RaftMemberImpl<DummyState> member;
};
const RaftConfig test_config{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms};
TEST(Raft, InitialElection) {
communication::reactor::System sys;
FakeNetworkInterface network(sys);
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
for (const auto &member_id : test_config.members) {
members.push_back(std::make_unique<RaftMemberTest>(sys, member_id,
test_config, network));
network.Connect(member_id);
}
std::this_thread::sleep_for(500ms);
std::string leader = members[0]->Leader();
for (const auto &member : members) {
EXPECT_EQ(member->Leader(), leader);
}
}
TEST_F(RaftMemberTest, Constructor) {
EXPECT_EQ(member.mode_, RaftMode::FOLLOWER);
EXPECT_EQ(member.term_, 1);
EXPECT_EQ(*member.voted_for_, "a");
EXPECT_EQ(member.commit_index_, 0);
}
TEST(Raft, Reelection) {
communication::reactor::System sys;
FakeNetworkInterface network(sys);
TEST_F(RaftMemberTest, CandidateOrLeaderTransitionToFollower) {
member.mode_ = RaftMode::CANDIDATE;
member.CandidateTransitionToLeader();
{
std::vector<std::unique_ptr<RaftMemberTest>> members;
for (const auto &member_id : test_config.members) {
members.push_back(std::make_unique<RaftMemberTest>(sys, member_id,
test_config, network));
network.Connect(member_id);
member.CandidateOrLeaderTransitionToFollower();
EXPECT_EQ(member.mode_, RaftMode::FOLLOWER);
EXPECT_EQ(member.leader_, std::experimental::nullopt);
EXPECT_LT(member.next_election_time_, TimePoint::max());
}
TEST_F(RaftMemberTest, CandidateTransitionToLeader) {
member.mode_ = RaftMode::CANDIDATE;
member.CandidateTransitionToLeader();
EXPECT_EQ(member.mode_, RaftMode::LEADER);
EXPECT_EQ(*member.leader_, "a");
EXPECT_EQ(member.next_election_time_, TimePoint::max());
}
TEST_F(RaftMemberTest, StartNewElection) {
member.StartNewElection();
EXPECT_EQ(member.mode_, RaftMode::CANDIDATE);
EXPECT_EQ(member.term_, 2);
EXPECT_EQ(member.voted_for_, member.id_);
}
TEST_F(RaftMemberTest, CountVotes) {
member.StartNewElection();
EXPECT_FALSE(member.CountVotes());
member.peer_states_["b"]->voted_for_me = true;
EXPECT_FALSE(member.CountVotes());
member.peer_states_["c"]->voted_for_me = true;
EXPECT_TRUE(member.CountVotes());
}
TEST_F(RaftMemberTest, AdvanceCommitIndex) {
SetLog({{1}, {1}, {1}, {1}, {2}, {2}, {2}, {2}});
member.mode_ = RaftMode::LEADER;
member.term_ = 2;
member.peer_states_["b"]->match_index = 4;
member.peer_states_["c"]->match_index = 4;
EXPECT_EQ(member.commit_index_, 0);
member.AdvanceCommitIndex();
EXPECT_EQ(member.commit_index_, 0);
member.peer_states_["b"]->match_index = 4;
member.peer_states_["c"]->match_index = 4;
member.AdvanceCommitIndex();
EXPECT_EQ(member.commit_index_, 0);
member.peer_states_["b"]->match_index = 5;
member.AdvanceCommitIndex();
EXPECT_EQ(member.commit_index_, 0);
member.peer_states_["c"]->match_index = 5;
member.AdvanceCommitIndex();
EXPECT_EQ(member.commit_index_, 5);
member.peer_states_["d"]->match_index = 6;
member.peer_states_["e"]->match_index = 7;
member.AdvanceCommitIndex();
EXPECT_EQ(member.commit_index_, 6);
member.peer_states_["c"]->match_index = 8;
member.AdvanceCommitIndex();
EXPECT_EQ(member.commit_index_, 7);
member.peer_states_["a"]->match_index = 8;
member.AdvanceCommitIndex();
EXPECT_EQ(member.commit_index_, 8);
}
TEST(RequestVote, SimpleElection) {
NextReplyNetworkInterface<DummyState> network;
InMemoryStorageInterface<DummyState> storage(1, {}, {{1}, {1}});
RaftMemberImpl<DummyState> member(network, storage, "a", test_config5);
member.StartNewElection();
std::unique_lock<std::mutex> lock(member.mutex_);
PeerRPCReply next_reply;
next_reply.type = PeerRPCReply::Type::REQUEST_VOTE;
network.on_request_ = [](const PeerRPCRequest<DummyState> &request) {
ASSERT_EQ(request.type, PeerRPCRequest<DummyState>::Type::REQUEST_VOTE);
ASSERT_EQ(request.request_vote.candidate_term, 2);
ASSERT_EQ(request.request_vote.candidate_id, "a");
};
/* member 'b' first voted for us */
next_reply.request_vote.term = 2;
next_reply.request_vote.vote_granted = true;
network.next_reply_ = next_reply;
member.RequestVote("b", *member.peer_states_["b"], lock);
EXPECT_EQ(member.mode_, RaftMode::CANDIDATE);
EXPECT_TRUE(member.peer_states_["b"]->request_vote_done);
EXPECT_TRUE(member.peer_states_["b"]->voted_for_me);
/* member 'c' didn't */
next_reply.request_vote.vote_granted = false;
network.next_reply_ = next_reply;
member.RequestVote("c", *member.peer_states_["c"], lock);
EXPECT_TRUE(member.peer_states_["c"]->request_vote_done);
EXPECT_FALSE(member.peer_states_["c"]->voted_for_me);
EXPECT_EQ(member.mode_, RaftMode::CANDIDATE);
/* but member 'd' did */
next_reply.request_vote.vote_granted = true;
network.next_reply_ = next_reply;
member.RequestVote("d", *member.peer_states_["d"], lock);
EXPECT_TRUE(member.peer_states_["d"]->request_vote_done);
EXPECT_TRUE(member.peer_states_["d"]->voted_for_me);
EXPECT_EQ(member.mode_, RaftMode::LEADER);
/* no-op entry should be at the end of leader's log */
EXPECT_EQ(storage.log_.back().term, 2);
EXPECT_EQ(storage.log_.back().command, std::experimental::nullopt);
}
TEST(AppendEntries, SimpleLogSync) {
NextReplyNetworkInterface<DummyState> network;
InMemoryStorageInterface<DummyState> storage(3, {}, {{1}, {1}, {2}, {3}});
RaftMemberImpl<DummyState> member(network, storage, "a", test_config2);
member.mode_ = RaftMode::LEADER;
std::unique_lock<std::mutex> lock(member.mutex_);
PeerRPCReply reply;
reply.type = PeerRPCReply::Type::APPEND_ENTRIES;
reply.append_entries.term = 3;
reply.append_entries.success = false;
network.next_reply_ = reply;
LogIndex expected_prev_log_index;
TermId expected_prev_log_term;
std::vector<LogEntry<DummyState>> expected_entries;
network.on_request_ = [&](const PeerRPCRequest<DummyState> &request) {
EXPECT_EQ(request.type, PeerRPCRequest<DummyState>::Type::APPEND_ENTRIES);
EXPECT_EQ(request.append_entries.leader_term, 3);
EXPECT_EQ(request.append_entries.leader_id, "a");
EXPECT_EQ(request.append_entries.prev_log_index, expected_prev_log_index);
EXPECT_EQ(request.append_entries.prev_log_term, expected_prev_log_term);
EXPECT_EQ(request.append_entries.entries, expected_entries);
};
/* initial state after election */
auto &peer_state = *member.peer_states_["b"];
peer_state.match_index = 0;
peer_state.next_index = 5;
peer_state.suppress_log_entries = true;
/* send a heartbeat and find out logs don't match */
expected_prev_log_index = 4;
expected_prev_log_term = 3;
expected_entries = {};
member.AppendEntries("b", peer_state, lock);
EXPECT_EQ(peer_state.match_index, 0);
EXPECT_EQ(peer_state.next_index, 4);
EXPECT_EQ(member.commit_index_, 0);
/* move `next_index` until we find a match, `expected_entries` will be empty
* because `suppress_log_entries` will be true */
expected_entries = {};
expected_prev_log_index = 3;
expected_prev_log_term = 2;
member.AppendEntries("b", peer_state, lock);
EXPECT_EQ(peer_state.match_index, 0);
EXPECT_EQ(peer_state.next_index, 3);
EXPECT_EQ(peer_state.suppress_log_entries, true);
EXPECT_EQ(member.commit_index_, 0);
expected_prev_log_index = 2;
expected_prev_log_term = 1;
member.AppendEntries("b", peer_state, lock);
EXPECT_EQ(peer_state.match_index, 0);
EXPECT_EQ(peer_state.next_index, 2);
EXPECT_EQ(peer_state.suppress_log_entries, true);
EXPECT_EQ(member.commit_index_, 0);
/* we found a match */
reply.append_entries.success = true;
network.next_reply_ = reply;
expected_prev_log_index = 1;
expected_prev_log_term = 1;
member.AppendEntries("b", peer_state, lock);
EXPECT_EQ(peer_state.match_index, 1);
EXPECT_EQ(peer_state.next_index, 2);
EXPECT_EQ(peer_state.suppress_log_entries, false);
EXPECT_EQ(member.commit_index_, 4);
/* now sync them */
expected_prev_log_index = 1;
expected_prev_log_term = 1;
expected_entries = {{1}, {2}, {3}};
member.AppendEntries("b", peer_state, lock);
EXPECT_EQ(peer_state.match_index, 4);
EXPECT_EQ(peer_state.next_index, 5);
EXPECT_EQ(peer_state.suppress_log_entries, false);
EXPECT_EQ(member.commit_index_, 4);
/* heartbeat after successful log sync */
expected_prev_log_index = 4;
expected_prev_log_term = 3;
expected_entries = {};
member.AppendEntries("b", peer_state, lock);
EXPECT_EQ(peer_state.match_index, 4);
EXPECT_EQ(peer_state.next_index, 5);
EXPECT_EQ(member.commit_index_, 4);
/* replicate a newly appended entry */
storage.AppendLogEntry({3});
expected_prev_log_index = 4;
expected_prev_log_term = 3;
expected_entries = {{3}};
member.AppendEntries("b", peer_state, lock);
EXPECT_EQ(peer_state.match_index, 5);
EXPECT_EQ(peer_state.next_index, 6);
EXPECT_EQ(member.commit_index_, 5);
}
template <class TestParam>
class RaftMemberParamTest : public ::testing::TestWithParam<TestParam> {
public:
virtual void SetUp() {
/* Some checks to verify that test case is valid. */
/* Member's term should be greater than or equal to last log term. */
ASSERT_GE(storage_.term_, storage_.GetLogTerm(storage_.GetLastLogIndex()));
ASSERT_GE(peer_storage_.term_,
peer_storage_.GetLogTerm(peer_storage_.GetLastLogIndex()));
/* If two logs match at some index, the entire prefix should match. */
LogIndex pos =
std::min(storage_.GetLastLogIndex(), peer_storage_.GetLastLogIndex());
for (; pos > 0; --pos) {
if (storage_.GetLogEntry(pos) == peer_storage_.GetLogEntry(pos)) {
break;
}
}
std::this_thread::sleep_for(500ms);
std::string first_leader = members[0]->Leader();
for (const auto &member : members) {
EXPECT_EQ(member->Leader(), first_leader);
}
network.Disconnect(first_leader);
std::this_thread::sleep_for(500ms);
std::string second_leader = members[0]->Id() == first_leader
? members[1]->Leader()
: members[0]->Leader();
network.Connect(first_leader);
std::this_thread::sleep_for(100ms);
for (const auto &member : members) {
EXPECT_EQ(member->Leader(), second_leader);
for (; pos > 0; --pos) {
ASSERT_EQ(storage_.GetLogEntry(pos), peer_storage_.GetLogEntry(pos));
}
}
RaftMemberParamTest(InMemoryStorageInterface<DummyState> storage,
InMemoryStorageInterface<DummyState> peer_storage)
: network_(NoOpNetworkInterface<DummyState>()),
storage_(storage),
member_(network_, storage_, "a", test_config3),
peer_storage_(peer_storage) {}
NoOpNetworkInterface<DummyState> network_;
InMemoryStorageInterface<DummyState> storage_;
RaftMemberImpl<DummyState> member_;
InMemoryStorageInterface<DummyState> peer_storage_;
};
struct OnRequestVoteTestParam {
TermId term;
std::experimental::optional<MemberId> voted_for;
std::vector<LogEntry<DummyState>> log;
TermId peer_term;
std::vector<LogEntry<DummyState>> peer_log;
bool expected_reply;
};
class OnRequestVoteTest : public RaftMemberParamTest<OnRequestVoteTestParam> {
public:
OnRequestVoteTest()
: RaftMemberParamTest(
InMemoryStorageInterface<DummyState>(
GetParam().term, GetParam().voted_for, GetParam().log),
InMemoryStorageInterface<DummyState>(GetParam().peer_term, {},
GetParam().peer_log)) {}
virtual ~OnRequestVoteTest() {}
};
TEST_P(OnRequestVoteTest, RequestVoteTest) {
auto reply = member_.OnRequestVote(
{GetParam().peer_term, "b", peer_storage_.GetLastLogIndex(),
peer_storage_.GetLogTerm(peer_storage_.GetLastLogIndex())});
EXPECT_EQ(reply.vote_granted, GetParam().expected_reply);
/* If we accepted the request, our term should be equal to candidate's term
* and voted_for should be set. */
EXPECT_EQ(reply.term,
reply.vote_granted ? GetParam().peer_term : GetParam().term);
EXPECT_EQ(storage_.term_,
reply.vote_granted ? GetParam().peer_term : GetParam().term);
EXPECT_EQ(storage_.voted_for_,
reply.vote_granted ? "b" : GetParam().voted_for);
}
/* Member 'b' is starting an election for term 5 and sending RequestVote RPC
* to 'a'. Logs are empty so log-up-to-date check will always pass. */
INSTANTIATE_TEST_CASE_P(
TermAndVotedForCheck, OnRequestVoteTest,
Values(
/* we didn't vote for anyone in a smaller term -> accept */
OnRequestVoteTestParam{3, {}, {}, 5, {}, true},
/* we voted for someone in smaller term -> accept */
OnRequestVoteTestParam{4, "c", {}, 5, {}, true},
/* equal term but we didn't vote for anyone in it -> accept */
OnRequestVoteTestParam{5, {}, {}, 5, {}, true},
/* equal term but we voted for this candidate-> accept */
OnRequestVoteTestParam{5, "b", {}, 5, {}, true},
/* equal term but we voted for someone else -> decline */
OnRequestVoteTestParam{5, "c", {}, 5, {}, false},
/* larger term and haven't voted for anyone -> decline */
OnRequestVoteTestParam{6, {}, {}, 5, {}, false},
/* larger term and we voted for someone else -> decline */
OnRequestVoteTestParam{6, "a", {}, 5, {}, false}));
/* Member 'a' log:
* 1 2 3 4 5 6 7
* | 1 | 1 | 1 | 2 | 3 | 3 |
*
* It is in term 5.
*/
/* Member 'b' is sending RequestVote RPC to 'a' for term 8. */
INSTANTIATE_TEST_CASE_P(
LogUpToDateCheck, OnRequestVoteTest,
Values(
/* candidate's last log term is smaller -> decline */
OnRequestVoteTestParam{5,
{},
{{1}, {1}, {1}, {2}, {3}, {3}},
8,
{{1}, {1}, {1}, {2}},
false},
/* candidate's last log term is smaller -> decline */
OnRequestVoteTestParam{5,
{},
{{1}, {1}, {1}, {2}, {3}, {3}},
8,
{{1}, {1}, {1}, {2}, {2}, {2}, {2}},
false},
/* candidate's term is equal, but our log is longer -> decline */
OnRequestVoteTestParam{5,
{},
{{1}, {1}, {1}, {2}, {3}, {3}},
8,
{{1}, {1}, {1}, {2}, {3}},
false},
/* equal logs -> accept */
OnRequestVoteTestParam{5,
{},
{{1}, {1}, {1}, {2}, {3}, {3}},
8,
{{1}, {1}, {1}, {2}, {3}, {3}},
true},
/* candidate's term is larger -> accept */
OnRequestVoteTestParam{5,
{},
{{1}, {1}, {1}, {2}, {3}, {3}},
8,
{{1}, {1}, {1}, {2}, {4}},
true},
/* equal terms, but candidate's log is longer -> accept */
OnRequestVoteTestParam{5,
{},
{{1}, {1}, {1}, {2}, {3}, {3}},
8,
{{1}, {1}, {1}, {2}, {3}, {3}, {3}},
true},
/* candidate's last log term is larger -> accept */
OnRequestVoteTestParam{5,
{},
{{1}, {1}, {1}, {2}, {3}, {3}},
8,
{{1}, {2}, {3}, {4}, {5}},
true}));
struct OnAppendEntriesTestParam {
TermId term;
std::vector<LogEntry<DummyState>> log;
TermId peer_term;
std::vector<LogEntry<DummyState>> peer_log;
LogIndex peer_next_index;
bool expected_reply;
TermId expected_term;
std::vector<LogEntry<DummyState>> expected_log;
};
class OnAppendEntriesTest
: public RaftMemberParamTest<OnAppendEntriesTestParam> {
public:
OnAppendEntriesTest()
: RaftMemberParamTest(
InMemoryStorageInterface<DummyState>(GetParam().term, {},
GetParam().log),
InMemoryStorageInterface<DummyState>(GetParam().peer_term, {},
GetParam().peer_log)) {}
virtual ~OnAppendEntriesTest() {}
};
TEST_P(OnAppendEntriesTest, All) {
auto last_log_index = GetParam().peer_next_index - 1;
auto last_log_term = peer_storage_.GetLogTerm(last_log_index);
auto entries = peer_storage_.GetLogSuffix(GetParam().peer_next_index);
auto reply = member_.OnAppendEntries(
{GetParam().peer_term, "b", last_log_index, last_log_term, entries, 0});
EXPECT_EQ(reply.success, GetParam().expected_reply);
EXPECT_EQ(reply.term, GetParam().expected_term);
EXPECT_EQ(storage_.log_, GetParam().expected_log);
}
/* Member 'a' recieved AppendEntries RPC from member 'b'. The request will
* contain no log entries, representing just a heartbeat, as it is not
* important in these scenarios. */
INSTANTIATE_TEST_CASE_P(
TermAndLogConsistencyCheck, OnAppendEntriesTest,
Values(
/* sender has stale term -> decline */
OnAppendEntriesTestParam{/* my term*/ 8,
{{1}, {1}, {2}},
7,
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
7,
false,
8,
{{1}, {1}, {2}}},
/* we're missing entries 4, 5 and 6 -> decline, but update term */
OnAppendEntriesTestParam{4,
{{1}, {1}, {2}},
8,
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
7,
false,
8,
{{1}, {1}, {2}}},
/* we're missing entry 4 -> decline, but update term */
OnAppendEntriesTestParam{5,
{{1}, {1}, {2}},
8,
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
5,
false,
8,
{{1}, {1}, {2}}},
/* log terms don't match at entry 4 -> decline, but update term */
OnAppendEntriesTestParam{5,
{{1}, {1}, {2}},
8,
{{1}, {1}, {3}, {3}, {4}, {5}, {5}, {6}},
4,
false,
8,
{{1}, {1}, {2}}},
/* logs match -> accept and update term */
OnAppendEntriesTestParam{5,
{{1}, {1}, {2}},
8,
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}},
4,
true,
8,
{{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}}},
/* now follow some log truncation tests */
/* no truncation, append a single entry */
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
9,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
/* no truncation, append multiple entries */
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
4,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
/* no truncation, leader's log is prefix of ours */
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {6}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
4,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {6}}},
/* another one, now with entries from newer term */
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
4,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}}},
/* no truncation, partial match between our log and appended entries
*/
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}, {4}, {5}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
4,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
/* truncate suffix */
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}, {4}, {4}, {4}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
5,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
/* truncate suffix, with partial match between our log and appened
entries */
OnAppendEntriesTestParam{
8,
{{1}, {1}, {1}, {4}, {4}, {4}, {4}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
4,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
/* delete whole log */
OnAppendEntriesTestParam{
8,
{{5}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
1,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}},
/* append on empty log */
OnAppendEntriesTestParam{
8,
{{}},
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}},
1,
true,
8,
{{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}));