Send log state along with AppendRequest messages from Leader

This commit is contained in:
Tyler Neely 2022-07-14 19:06:56 +00:00
parent d14f7705b1
commit 63beeb8771

View File

@ -158,9 +158,12 @@ class Server {
// 5 -> 3 (index 2)
state_.commit_index = indices[(indices.size() / 2)];
Log("leader commit_index is now ", state_.commit_index);
while (!leader.pending_client_requests.empty()) {
auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.commit_index) {
Log("Leader responding SUCCESS to client");
ReplicationResponse rr{
.success = true,
.retry_leader = std::nullopt,
@ -174,16 +177,24 @@ class Server {
}
void BroadcastAppendEntries(std::map<Address, FollowerTracker> &followers) {
AppendRequest ar{
.term = state_.term,
.last_log_index = 0,
.last_log_term = 0,
.entries = std::vector<std::pair<Term, Op>>(),
.leader_commit = state_.commit_index,
};
for (auto &[address, follower] : followers) {
LogIndex index = follower.confirmed_contiguous_index;
std::vector<std::pair<Term, Op>> entries;
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
AppendRequest ar{
.term = state_.term,
.last_log_index = index,
.last_log_term = TermAtIndex(index),
.entries = entries,
.leader_commit = state_.commit_index,
};
// request_id not necessary to set because it's not a Future-backed Request.
RequestId request_id = 0;
io_.Send(address, request_id, ar);
}
}
@ -193,6 +204,15 @@ class Server {
return io_.Rand(time_distrib);
}
Term TermAtIndex(LogIndex index) {
if (state_.log.size() <= index) {
return 0;
} else {
auto &[term, data] = state_.log.at(index);
return term;
}
}
LogIndex CommittedLogIndex() { return state_.commit_index; }
Term CommittedLogTerm() {
@ -442,8 +462,8 @@ class Server {
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &role, AppendRequest &&req, RequestId request_id, Address from_address) {
AppendResponse res{
.term = state_.term,
.success = false,
.term = state_.term,
.last_log_term = CommittedLogTerm(),
.last_log_index = CommittedLogIndex(),
};
@ -480,7 +500,7 @@ class Server {
Log("req.last_log_term differs from our leader term at that slot");
} else {
// happy path
Log("Follower applying batch of entries to log");
Log("Follower applying batch of entries to log of size ", req.entries.size());
// possibly chop-off stuff that was replaced by
// things with different terms (we got data that