Fix bug-prone leader application of write requests to the replicated state

This commit is contained in:
Tyler Neely 2022-08-11 12:17:33 +00:00
parent 5b69f435b6
commit 69a2d73172

View File

@ -19,6 +19,7 @@
#include <map>
#include <set>
#include <thread>
#include <unordered_map>
#include <vector>
#include "io/simulator/simulator.hpp"
@ -124,7 +125,6 @@ struct FollowerTracker {
};
struct PendingClientRequest {
LogIndex log_index;
RequestId request_id;
Address address;
Time received_at;
@ -132,7 +132,7 @@ struct PendingClientRequest {
struct Leader {
std::map<Address, FollowerTracker> followers;
std::deque<PendingClientRequest> pending_client_requests;
std::unordered_map<LogIndex, PendingClientRequest> pending_client_requests;
Time last_broadcast = Time::min();
static void Print() { std::cout << "\tLeader \t"; }
@ -244,41 +244,37 @@ class Raft {
indices.push_back(f.confirmed_contiguous_index);
Log("at port ", addr.last_known_port, " has confirmed contiguous index of: ", f.confirmed_contiguous_index);
}
// reverse sort from highest to lowest (using std::ranges::greater)
std::ranges::sort(indices, std::ranges::greater());
// assuming reverse sort (using std::ranges::greater)
size_t new_committed_log_size = indices[(indices.size() / 2)];
// For each index between the old index and the new one,
// Apply that log's WriteOperation to our replicated_state_,
// and use the specific return value of the
// ReplicatedState::Apply method (WriteResponseValue) to
// respond to the requester.
//
// This will completely replace the while loop below
size_t apply_index = state_.committed_log_size;
state_.committed_log_size = new_committed_log_size;
Log("committed_log_size is now ", state_.committed_log_size);
// For each index between the old index and the new one (inclusive),
// Apply that log's WriteOperation to our replicated_state_,
// and use the specific return value of the ReplicatedState::Apply
// method (WriteResponseValue) to respond to the requester.
for (; apply_index < new_committed_log_size; apply_index++) {
const auto &write_request = state_.log[apply_index].second;
WriteResponseValue write_return = replicated_state_.Apply(write_request);
if (leader.pending_client_requests.contains(apply_index)) {
PendingClientRequest client_request = std::move(leader.pending_client_requests.at(apply_index));
// TODO(tyler) Bug-prone application of writes and responses.
// For now, the pending_client_requests deque
// does correspond to the log order, but this
// will not be true once client requests time out
// over time.
while (!leader.pending_client_requests.empty()) {
const auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.committed_log_size) {
const auto &write_request = state_.log[front.log_index].second;
WriteResponseValue write_return = replicated_state_.Apply(write_request);
WriteResponse<WriteResponseValue> resp;
resp.success = true;
resp.write_return = write_return;
io_.Send(front.address, front.request_id, std::move(resp));
leader.pending_client_requests.pop_front();
} else {
break;
io_.Send(client_request.address, client_request.request_id, std::move(resp));
leader.pending_client_requests.erase(apply_index);
}
}
Log("committed_log_size is now ", state_.committed_log_size);
}
// Raft paper - 5.1
@ -579,7 +575,7 @@ class Raft {
return Leader{
.followers = std::move(followers),
.pending_client_requests = std::deque<PendingClientRequest>(),
.pending_client_requests = std::unordered_map<LogIndex, PendingClientRequest>(),
};
}
@ -796,14 +792,15 @@ class Raft {
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.operation)));
LogIndex log_index = state_.log.size() - 1;
PendingClientRequest pcr{
.log_index = state_.log.size() - 1,
.request_id = request_id,
.address = from_address,
.received_at = io_.Now(),
};
leader.pending_client_requests.push_back(pcr);
leader.pending_client_requests.emplace(log_index, pcr);
BroadcastAppendEntries(leader.followers);