Clean up comments and TODOs

This commit is contained in:
Tyler Neely 2022-08-09 14:07:42 +00:00
parent 36c133ce4e
commit d15105455a

View File

@ -11,7 +11,6 @@
// TODO(tyler) buffer out-of-order Append buffers to reassemble more quickly
// TODO(tyler) handle granular batch sizes based on simple flow control
// TODO(tyler) add "application" test that asserts that all state machines apply the same items in-order
// TODO(tyler) add proper token-based deterministic scheduling
#pragma once
@ -77,7 +76,14 @@ struct ReadResponse {
std::optional<Address> retry_leader;
};
// TODO(tyler) add docs
/// AppendRequest is a raft-level message that the Leader
/// periodically broadcasts to all Follower peers. This
/// serves three main roles:
/// 1. acts as a heartbeat from the Leader to the Follower
/// 2. replicates new data that the Leader has received to the Follower
/// 3. informs Follower peers when the commit index has increased,
/// signalling that it is now safe to apply log items to the
/// replicated state machine
template <typename WriteRequest>
struct AppendRequest {
Term term = 0;
@ -127,6 +133,7 @@ struct PendingClientRequest {
LogIndex log_index;
RequestId request_id;
Address address;
Time received_at;
};
struct Leader {
@ -155,40 +162,28 @@ struct Follower {
using Role = std::variant<Candidate, Leader, Follower>;
/*
TODO make concept that expresses the fact that any RSM must
be able to have a specific type applied to it, returning
another interesting result type.
all ReplicatedState classes should have an apply method
all ReplicatedState classes should have an Apply method
that returns our WriteResponseValue:
ReadResponse read(ReadOperation);
WriteResponseValue ReplicatedState::apply(WriteRequest);
ReadResponse Read(ReadOperation);
WriteResponseValue ReplicatedState::Apply(WriteRequest);
for examples:
if the state is uint64_t, and WriteRequest is `struct PlusOne {};`,
and WriteResponseValue is also uint64_t (the new value), then
each call to state.apply(PlusOne{}) will return the new value
each call to state.Apply(PlusOne{}) will return the new value
after incrementing it. 0, 1, 2, 3... and this will be sent back
to the client that requested the mutation.
In practice, these mutations will usually be predicated on some
previous value, so that they are idempotent, functioning similarly
to a CAS operation.
template<typename Write, typename T, typename WriteResponse>
concept Rsm = requires(T t, Write w)
{
{ t.read(r) } -> std::same_as<ReadResponse>;
{ t.apply(w) } -> std::same_as<WriteResponseValue>;
};
*/
template <typename WriteOperation, typename ReadOperation, typename ReplicatedState, typename WriteResponseValue,
typename ReadResponseValue>
concept Rsm = requires(ReplicatedState state, WriteOperation w, ReadOperation r) {
{ state.read(r) } -> std::same_as<ReadResponseValue>;
{ state.apply(w) } -> std::same_as<WriteResponseValue>;
{ state.Read(r) } -> std::same_as<ReadResponseValue>;
{ state.Apply(w) } -> std::same_as<WriteResponseValue>;
};
/// Parameter Purpose
@ -201,7 +196,7 @@ concept Rsm = requires(ReplicatedState state, WriteOperation w, ReadOperation r)
/// identical order across all replicas after an WriteOperation reaches consensus.
/// ReadOperation the type of operations that do not require consensus before executing directly
/// on a const ReplicatedState &
/// ReadResponseValue the return value of calling ReplicatedState::read(ReadOperation), which is executed directly
/// ReadResponseValue the return value of calling ReplicatedState::Read(ReadOperation), which is executed directly
/// without going through consensus first
template <typename IoImpl, typename ReplicatedState, typename WriteOperation, typename WriteResponseValue,
typename ReadOperation, typename ReadResponseValue>
@ -259,31 +254,31 @@ class Raft {
// assuming reverse sort (using std::ranges::greater)
size_t new_committed_log_size = indices[(indices.size() / 2)];
// TODO(tyler / gabor) for each index between the old
// index and the new one, apply that log's WriteOperation
// to our replicated_state_, and use the specific return
// value of the ReplicatedState::apply method (WriteResponseValue)
// to respondto the requester.
// For each index between the old index and the new one,
// Apply that log's WriteOperation to our replicated_state_,
// and use the specific return value of the
// ReplicatedState::Apply method (WriteResponseValue) to
// respond to the requester.
//
// this will completely replace the while loop below
// This will completely replace the while loop below
state_.committed_log_size = new_committed_log_size;
Log("committed_log_size is now ", state_.committed_log_size);
// TODO(tyler) Bug-prone application of writes and responses.
// For now, the pending_client_requests deque
// does correspond to the log order, but this
// will not be true once client requests time out
// over time.
while (!leader.pending_client_requests.empty()) {
const auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.committed_log_size) {
const auto &write_request = state_.log[front.log_index].second;
WriteResponseValue write_return = replicated_state_.apply(write_request);
WriteResponseValue write_return = replicated_state_.Apply(write_request);
WriteResponse<WriteResponseValue> resp;
resp.success = true;
resp.write_return = write_return;
// Log("responding SUCCESS to client");
// WriteResponse rr{
// .success = true,
// .retry_leader = std::nullopt,
// };
io_.Send(front.address, front.request_id, std::move(resp));
leader.pending_client_requests.pop_front();
} else {
@ -477,7 +472,7 @@ class Raft {
BroadcastAppendEntries(leader.followers);
leader.last_broadcast = now;
}
// TODO(tyler) TimeOutOldClientRequests();
return std::nullopt;
}
@ -668,7 +663,7 @@ class Raft {
Log("req.last_log_term differs from our leader term at that slot, expected: ", LastLogTerm(), " but got ",
req.last_log_term);
} else {
// happy path - apply log
// happy path - Apply log
Log("applying batch of entries to log of size ", req.entries.size());
MG_ASSERT(req.last_log_index >= state_.committed_log_size,
@ -726,11 +721,10 @@ class Raft {
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
std::optional<Role> Handle(Leader &leader, ReadRequest<ReadOperation> &&req, RequestId request_id,
Address from_address) {
// TODO(tyler / gabor) implement
Log("handling ReadOperation");
ReadOperation read_operation = req.operation;
ReadResponseValue read_return = replicated_state_.read(read_operation);
ReadResponseValue read_return = replicated_state_.Read(read_operation);
ReadResponse<ReadResponseValue> resp{
.success = true,
@ -746,8 +740,6 @@ class Raft {
// Candidates should respond with a failure, similar to the Candidate + WriteRequest failure below
std::optional<Role> Handle(Candidate &, ReadRequest<ReadOperation> &&req, RequestId request_id,
Address from_address) {
// TODO(tyler / gabor) implement
Log("received ReadOperation - not redirecting because no Leader is known");
auto res = ReadResponse<ReadResponseValue>{};
@ -760,11 +752,9 @@ class Raft {
return std::nullopt;
}
// Leaders should respond with a redirection, similar to the Follower + WriteRequest response below
// Followers should respond with a redirection, similar to the Follower + WriteRequest response below
std::optional<Role> Handle(Follower &follower, ReadRequest<ReadOperation> &&req, RequestId request_id,
Address from_address) {
// TODO(tyler / gabor) implement
auto res = ReadResponse<ReadResponseValue>{};
res.success = false;
@ -811,7 +801,7 @@ class Raft {
// only leaders actually handle replication requests from clients
std::optional<Role> Handle(Leader &leader, WriteRequest<WriteOperation> &&req, RequestId request_id,
Address from_address) {
Log("received WriteRequest");
Log("handling WriteRequest");
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.operation)));
@ -820,13 +810,13 @@ class Raft {
.log_index = state_.log.size() - 1,
.request_id = request_id,
.address = from_address,
.received_at = io_.Now(),
};
leader.pending_client_requests.push_back(pcr);
BroadcastAppendEntries(leader.followers);
// TODO(tyler) add message to pending requests buffer, reply asynchronously
return std::nullopt;
}
};