A large number of scattered cleanups. Don't use transport Request in state machine code

This commit is contained in:
Tyler Neely 2022-07-14 10:50:49 +00:00
parent 1a12a80af0
commit 57533f2746

View File

@ -11,6 +11,7 @@
#include <iostream>
#include <map>
#include <set>
#include <thread>
#include <vector>
@ -19,6 +20,9 @@
using Op = std::vector<uint8_t>;
using Term = uint64_t;
using LogIndex = uint64_t;
using Time = uint64_t;
using Duration = uint64_t;
using RequestId = uint64_t;
/// The request that a client sends to request that
/// the cluster replicates their data.
@ -31,7 +35,7 @@ struct ReplicationResponse {
std::optional<Address> retry_leader;
};
struct AppendEntriesRequest {
struct AppendRequest {
Term term;
Term prev_log_index;
Term prev_log_term;
@ -39,7 +43,7 @@ struct AppendEntriesRequest {
Term leader_commit;
};
struct AppendEntriesResponse {
struct AppendResponse {
bool success;
Term last_log_term;
// a small optimization over the raft paper, tells
@ -49,42 +53,45 @@ struct AppendEntriesResponse {
Term last_log_index;
};
struct RequestVotesRequest {
struct VoteRequest {
Term term;
LogIndex last_log_index;
Term last_log_term;
};
struct RequestVotesResponse {
struct VoteResponse {
Term term;
LogIndex commit_index;
bool vote_granted;
};
struct CommonState {
Term current_term;
Term term;
std::optional<Address> voted_for;
std::vector<std::pair<Term, Op>> log;
LogIndex commit_index;
LogIndex last_applied;
uint64_t randomized_timeout;
};
struct FollowerTracker {
Address address;
LogIndex next_index;
std::optional<ResponseFuture<AppendEntriesResponse>> in_flight_message;
uint64_t last_received_append_entries_timestamp = 0;
std::optional<ResponseFuture<AppendResponse>> in_flight_message;
Time last_received_append_entries_timestamp = 0;
};
struct Leader {
std::vector<FollowerTracker> followers;
};
struct Candidate {};
struct Candidate {
std::vector<std::pair<Address, LogIndex>> successful_votes;
Time election_began;
std::set<Address> outstanding_votes;
};
struct Follower {
uint64_t last_received_append_entries_timestamp = 0;
Time last_received_append_entries_timestamp;
};
using Role = std::variant<Candidate, Leader, Follower>;
@ -96,24 +103,28 @@ class Server {
void Run() {
// 120ms between Cron calls
uint64_t cron_interval = 120000;
uint64_t last_cron = 0;
Duration cron_interval = 120000;
Time last_cron = 0;
common_state_.randomized_timeout = RandomTimeout(100000, 150000);
io_.SetDefaultTimeoutMicroseconds(RandomTimeout(100000, 150000));
while (!io_.ShouldShutDown()) {
auto now = io_.Now();
if (now - last_cron > cron_interval) {
Log("doing cron");
Cron();
last_cron = now;
}
auto request_result = io_.template Receive<AppendEntriesRequest, RequestVotesRequest, ReplicationRequest>();
Log("waiting for requests");
auto request_result = io_.template Receive<AppendRequest, VoteRequest, VoteResponse, ReplicationRequest>();
if (request_result.HasError()) {
continue;
}
Log("received request");
auto request = std::move(request_result.GetValue());
Handle(std::move(request.message), request.request_id, request.from_address);
@ -121,118 +132,80 @@ class Server {
}
private:
CommonState common_state_;
CommonState state_;
Role role_ = Candidate{};
Io<IoImpl> io_;
std::vector<Address> peers_;
uint64_t last_heard_from_leader_;
Time last_heard_from_leader_;
uint64_t RandomTimeout(uint64_t min, uint64_t max) {
Duration RandomTimeout(Duration min, Duration max) {
std::uniform_int_distribution<> time_distrib(min, max);
return io_.Rand(time_distrib);
}
LogIndex LastLogIndex() { return common_state_.log.size(); }
LogIndex LastLogIndex() { return state_.log.size(); }
Term LastLogTerm() {
if (common_state_.log.empty()) {
if (state_.log.empty()) {
return 0;
} else {
auto &[term, data] = common_state_.log.back();
auto &[term, data] = state_.log.back();
return term;
}
}
/// Bump term and broadcast RequestVotes to all peers and return a vector of response futures
std::vector<ResponseFuture<RequestVotesResponse>> BroadcastVotes() {
std::vector<ResponseFuture<RequestVotesResponse>> ret{};
// Candidates keep sending Vote to peers until:
// 1. receiving Append with a higher term (become Follower)
// 2. receiving Vote with a higher term (become a Follower)
// 3. receiving a quorum of responses to our last batch of Vote (become a Leader)
std::optional<Role> Cron(Candidate &candidate) {
auto now = io_.Now();
Duration election_timeout = RandomTimeout(100000, 150000);
common_state_.current_term++;
if (now - candidate.election_began > election_timeout) {
Log("running for election");
state_.term++;
VoteRequest request{
.term = state_.term,
.last_log_index = LastLogIndex(),
.last_log_term = LastLogTerm(),
};
RequestVotesRequest request{
.term = common_state_.current_term,
.last_log_index = LastLogIndex(),
.last_log_term = LastLogTerm(),
};
auto outstanding_votes = std::set<Address>();
for (const auto &peer : peers_) {
ResponseFuture<RequestVotesResponse> future =
io_.template Request<RequestVotesRequest, RequestVotesResponse>(peer, std::move(request));
ret.emplace_back(std::move(future));
}
return ret;
}
std::optional<Role> RunForElection() {
std::vector<ResponseFuture<RequestVotesResponse>> outstanding_votes = BroadcastVotes();
int successes = 0;
bool success = false;
auto peer_commit_indices = std::map<Address, int>();
for (const auto &peer : peers_) {
peer_commit_indices.insert({peer, 0});
}
for (auto &&future : std::move(outstanding_votes)) {
ResponseResult<RequestVotesResponse> response = future.Wait();
if (response.HasError()) {
// timed out
continue;
for (const auto &peer : peers_) {
// request_id not necessary to set because it's not a Future-backed Request.
auto request_id = 0;
io_.template Send<VoteRequest>(peer, request_id, request);
outstanding_votes.insert(peer);
}
ResponseEnvelope<RequestVotesResponse> res_env = std::move(response).GetValue();
if (res_env.message.vote_granted) {
peer_commit_indices.insert_or_assign(res_env.from_address, res_env.message.commit_index);
successes++;
if (successes > (peers_.size() / 2)) {
success = true;
break;
}
}
}
if (success) {
Log("ELECTED");
return Leader{};
} else {
return Candidate{};
return Candidate{
.successful_votes = std::vector<std::pair<Address, LogIndex>>(),
.election_began = now,
.outstanding_votes = outstanding_votes,
};
}
return std::nullopt;
}
/// Periodic protocol maintenance.
void Cron() {
// dispatch periodic logic based on our role to a specific Cron method.
std::optional<Role> new_role = std::visit([&](auto &&role) { return Cron(role); }, role_);
if (new_role) {
Log("becoming new role");
role_ = std::move(new_role).value();
}
}
// Candidates keep sending RequestVotes to peers until:
// 1. receiving AppendEntries with a higher term (become Follower)
// 2. receiving RequestVotes with a higher term (become a Follower)
// 3. receiving a quorum of responses to our last batch of RequestVotes (become a Leader)
std::optional<Role> Cron(Candidate &candidate) { return RunForElection(); }
// Followers become candidates if we haven't heard from the leader
// after a randomized timeout.
std::optional<Role> Cron(Follower &follower) {
auto now = io_.Now();
auto time_since_last_append_entries = now - follower.last_received_append_entries_timestamp;
Duration election_timeout = RandomTimeout(100000, 150000);
// randomized follower timeout with a range of 100-150ms.
if (time_since_last_append_entries > RandomTimeout(100000, 150000)) {
if (time_since_last_append_entries > election_timeout) {
// become a Candidate if we haven't heard from the Leader after this timeout
return Candidate{};
} else {
return std::nullopt;
}
}
// Leaders (re)send AppendEntriesRequest to followers.
// Leaders (re)send AppendRequest to followers.
std::optional<Role> Cron(Leader &) {
// TODO time-out client requests if we haven't made progress after some threshold
return std::nullopt;
@ -247,8 +220,8 @@ class Server {
/// to its role, and as the second argument, the
/// message that has been received.
/// **********************************************
void Handle(std::variant<AppendEntriesRequest, RequestVotesRequest, ReplicationRequest> &&message_variant,
uint64_t request_id, Address from_address) {
void Handle(std::variant<AppendRequest, VoteRequest, VoteResponse, ReplicationRequest> &&message_variant,
RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
// or it can be `auto` if it's a handler for several roles
@ -264,18 +237,18 @@ class Server {
}
}
// all roles can receive RequestVotes and possibly become a follower
// all roles can receive Vote and possibly become a follower
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, RequestVotesRequest &&req, uint64_t request_id, Address from_address) {
Log("RECEIVED RequestVotes :)");
std::optional<Role> Handle(AllRoles &, VoteRequest &&req, RequestId request_id, Address from_address) {
Log("RECEIVED Vote :)");
bool last_log_term_dominates = req.last_log_term >= LastLogTerm();
bool term_dominates = req.term > common_state_.current_term;
bool term_dominates = req.term > state_.term;
bool last_log_index_dominates = req.last_log_index >= LastLogIndex();
bool new_leader = last_log_term_dominates && term_dominates && last_log_index_dominates;
RequestVotesResponse res{
.term = std::max(req.term, common_state_.current_term),
.commit_index = common_state_.commit_index,
VoteResponse res{
.term = std::max(req.term, state_.term),
.commit_index = state_.commit_index,
.vote_granted = new_leader,
};
@ -283,8 +256,8 @@ class Server {
if (new_leader) {
// become a follower
common_state_.current_term = req.term;
common_state_.voted_for = from_address;
state_.term = req.term;
state_.voted_for = from_address;
return Follower{
.last_received_append_entries_timestamp = io_.Now(),
};
@ -293,12 +266,62 @@ class Server {
}
}
std::optional<Role> Handle(Candidate &candidate, VoteResponse &&res, RequestId request_id, Address from_address) {
Log("Candidate received VoteResponse");
if (res.term != state_.term) {
MG_ASSERT(res.term < state_.term, "Somehow received a VoteResponse from the future!");
// we received a delayed VoteResponse from the past, which has to do with an election that is
// no longer valid. We can simply drop this.
Log("received VoteResponse from old term ", res.term, " but our candidacy term is ", state_.term);
return std::nullopt;
}
if (res.vote_granted) {
MG_ASSERT(candidate.outstanding_votes.contains(from_address), "Received unexpected VoteResponse!");
/*
peer_commit_indices.insert_or_assign(res_env.from_address, res_env.message.commit_index);
successes++;
if (successes > (peers_.size() / 2)) {
success = true;
break;
}
*/
}
/*
if (success) {
Log("ELECTED");
return Leader{};
} else {
}
*/
return std::nullopt;
}
/// Periodic protocol maintenance.
void Cron() {
// dispatch periodic logic based on our role to a specific Cron method.
std::optional<Role> new_role = std::visit([&](auto &&role) { return Cron(role); }, role_);
if (new_role) {
Log("becoming new role");
role_ = std::move(new_role).value();
}
}
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, VoteResponse &&res, RequestId request_id, Address from_address) {
Log("non-Candidate received VoteResponse");
return std::nullopt;
}
// only leaders actually handle replication requests from clients
std::optional<Role> Handle(Leader &, ReplicationRequest &&req, uint64_t request_id, Address from_address) {
std::optional<Role> Handle(Leader &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
Log("leader RECEIVED ReplicationRequest :)");
// we are the leader. add item to log and send AppendEntries to peers
common_state_.log.emplace_back(std::pair(common_state_.current_term, std::move(req.opaque_data)));
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.opaque_data)));
// TODO add message to pending requests buffer, reply asynchronously
return std::nullopt;
@ -307,14 +330,14 @@ class Server {
// non-leaders respond to replication requests with a redirection to the leader
// template<typename AllRoles>
template <typename AllRoles>
std::optional<Role> Handle(const AllRoles &, ReplicationRequest &&req, uint64_t request_id, Address from_address) {
std::optional<Role> Handle(const AllRoles &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
Log("all RECEIVED ReplicationRequest :)");
auto res = ReplicationResponse{};
res.success = false;
if (common_state_.voted_for) {
Log("redirecting client to known leader with port ", common_state_.voted_for->last_known_port);
res.retry_leader = *common_state_.voted_for;
if (state_.voted_for) {
Log("redirecting client to known leader with port ", state_.voted_for->last_known_port);
res.retry_leader = *state_.voted_for;
}
io_.Send(from_address, request_id, res);
@ -322,26 +345,26 @@ class Server {
return std::nullopt;
}
// anyone can receive an AppendEntriesRequest and potentially be flipped to a follower
// anyone can receive an AppendRequest and potentially be flipped to a follower
// state.
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, AppendEntriesRequest &&aer, uint64_t request_id, Address from_address) {
Log("RECEIVED AppendEntries from a leader");
std::optional<Role> Handle(AllRoles &, AppendRequest &&aer, RequestId request_id, Address from_address) {
Log("RECEIVED Append from a leader");
bool error = false;
if (from_address != common_state_.voted_for) {
if (from_address != state_.voted_for) {
Log("req.from_address is not who we voted for");
error |= true;
} else if (aer.term != common_state_.current_term) {
} else if (aer.term != state_.term) {
Log("req.term differs from our current leader term");
error |= true;
} else if (aer.prev_log_index > common_state_.log.size()) {
} else if (aer.prev_log_index > state_.log.size()) {
Log("req.prev_log_index is above our last applied log index");
// TODO: buffer this and apply it later rather than having to wait for
// the leader to double-send future segments to us.
error |= true;
} else {
auto [prev_log_term, data] = common_state_.log.at(aer.prev_log_index);
auto [prev_log_term, data] = state_.log.at(aer.prev_log_index);
if (aer.prev_log_term != prev_log_term) {
Log("req.prev_log_term differs from our leader term at that slot");
@ -356,18 +379,18 @@ class Server {
// possibly chop-off stuff that was replaced by
// things with different terms (we got data that
// hasn't reached consensus yet, which is normal)
// MG_ASSERT(req.last_log_index > common_state_.commit_index);
common_state_.log.resize(aer.prev_log_index);
// MG_ASSERT(req.last_log_index > state_.commit_index);
state_.log.resize(aer.prev_log_index);
common_state_.log.insert(common_state_.log.end(), aer.entries.begin(), aer.entries.end());
state_.log.insert(state_.log.end(), aer.entries.begin(), aer.entries.end());
common_state_.commit_index = std::min(aer.leader_commit, common_state_.log.size());
state_.commit_index = std::min(aer.leader_commit, state_.log.size());
}
auto res = AppendEntriesResponse{
auto res = AppendResponse{
.success = !error,
.last_log_term = common_state_.current_term,
.last_log_index = common_state_.log.size(),
.last_log_term = state_.term,
.last_log_index = state_.log.size(),
};
io_.Send(from_address, request_id, res);