Continue filling out AppendEntries state transition logic

This commit is contained in:
Tyler Neely 2022-07-14 13:06:54 +00:00
parent dc78adde40
commit 5e98971bb2

View File

@ -134,13 +134,23 @@ class Server {
Role role_ = Candidate{};
Io<IoImpl> io_;
std::vector<Address> peers_;
Time last_heard_from_leader_;
Duration RandomTimeout(Duration min, Duration max) {
std::uniform_int_distribution<> time_distrib(min, max);
return io_.Rand(time_distrib);
}
LogIndex CommittedLogIndex() { return state_.commit_index; }
Term CommittedLogTerm() {
if (state_.log.empty()) {
return 0;
} else {
auto &[term, data] = state_.log.at(state_.commit_index);
return term;
}
}
LogIndex LastLogIndex() { return state_.log.size(); }
Term LastLogTerm() {
@ -364,55 +374,63 @@ class Server {
return std::nullopt;
}
std::optional<Role> Handle(Follower &follower, AppendRequest &&aer, RequestId request_id, Address from_address) {
if (follower.leader_address != from_address) {
} else if (aer.term != state_.term) {
} else {
follower.last_received_append_entries_timestamp = io_.Now();
}
std::optional<Role> Handle(Follower &follower, AppendRequest &&req, RequestId request_id, Address from_address) {
AppendResponse res{
.success = false,
.last_log_term = CommittedLogTerm(),
.last_log_index = CommittedLogIndex(),
};
/*
if (from_address != state_.voted_for) {
Log("req.from_address is not who we voted for");
error |= true;
} else if (aer.term != state_.term) {
Log("req.term differs from our current leader term");
error |= true;
} else if (aer.last_log_index > state_.log.size()) {
// Handle early-exit conditions.
if (req.term > state_.term) {
// become follower of this leader, reply with our log status
state_.term = req.term;
io_.Send(from_address, request_id, res);
return Follower{
.last_received_append_entries_timestamp = io_.Now(),
.leader_address = from_address,
};
} else if (req.term < state_.term) {
// nack this request from an old leader
io_.Send(from_address, request_id, res);
return std::nullopt;
};
MG_ASSERT(follower.leader_address == from_address, "Multiple Leaders are acting under the same term number!");
follower.last_received_append_entries_timestamp = io_.Now();
// Handle steady-state conditions.
if (req.last_log_index != LastLogIndex()) {
Log("req.last_log_index is above our last applied log index");
// TODO: buffer this and apply it later rather than having to wait for
// the leader to double-send future segments to us.
error |= true;
} else if (req.last_log_term != LastLogTerm()) {
Log("req.last_log_term differs from our leader term at that slot");
} else {
auto [last_log_term, data] = state_.log.at(aer.last_log_index);
if (aer.last_log_term != last_log_term) {
Log("req.last_log_term differs from our leader term at that slot");
error |= true;
}
}
if (!error) {
// happy path
last_heard_from_leader_ = io_.Now();
Log("Follower applying batch of entries to log");
// possibly chop-off stuff that was replaced by
// things with different terms (we got data that
// hasn't reached consensus yet, which is normal)
// MG_ASSERT(req.last_log_index > state_.commit_index);
state_.log.resize(aer.last_log_index);
state_.log.resize(req.last_log_index);
state_.log.insert(state_.log.end(), aer.entries.begin(), aer.entries.end());
state_.log.insert(state_.log.end(), req.entries.begin(), req.entries.end());
state_.commit_index = std::min(aer.leader_commit, state_.log.size());
state_.commit_index = std::min(req.leader_commit, LastLogIndex());
res.success = true;
res.last_log_term = LastLogTerm();
res.last_log_index = LastLogIndex();
}
*/
io_.Send(from_address, request_id, res);
return std::nullopt;
}
// anyone can receive an AppendRequest and potentially be flipped to a follower
// state.
// Non-Followers can receive an AppendRequest and potentially be flipped to a follower state.
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, AppendRequest &&req, RequestId request_id, Address from_address) {
Log("non-Follower received Append from a Leader");
@ -436,8 +454,8 @@ class Server {
AppendResponse res{
.success = error,
.last_log_term = state_.term,
.last_log_index = state_.log.size(),
.last_log_term = CommittedLogTerm(),
.last_log_index = CommittedLogIndex(),
};
io_.Send(from_address, request_id, res);