Get basic leader election to work and clean some things up
This commit is contained in:
parent
57533f2746
commit
80970a97f0
@ -37,8 +37,8 @@ struct ReplicationResponse {
|
||||
|
||||
struct AppendRequest {
|
||||
Term term;
|
||||
Term prev_log_index;
|
||||
Term prev_log_term;
|
||||
Term last_log_index;
|
||||
Term last_log_term;
|
||||
std::vector<std::pair<Term, Op>> entries;
|
||||
Term leader_commit;
|
||||
};
|
||||
@ -67,31 +67,29 @@ struct VoteResponse {
|
||||
|
||||
struct CommonState {
|
||||
Term term;
|
||||
std::optional<Address> voted_for;
|
||||
std::vector<std::pair<Term, Op>> log;
|
||||
LogIndex commit_index;
|
||||
LogIndex last_applied;
|
||||
};
|
||||
|
||||
struct FollowerTracker {
|
||||
Address address;
|
||||
LogIndex next_index;
|
||||
std::optional<ResponseFuture<AppendResponse>> in_flight_message;
|
||||
Time last_received_append_entries_timestamp = 0;
|
||||
LogIndex confirmed_contiguous_index = 0;
|
||||
};
|
||||
|
||||
struct Leader {
|
||||
std::vector<FollowerTracker> followers;
|
||||
std::map<Address, FollowerTracker> followers;
|
||||
};
|
||||
|
||||
struct Candidate {
|
||||
std::vector<std::pair<Address, LogIndex>> successful_votes;
|
||||
std::map<Address, LogIndex> successful_votes;
|
||||
Time election_began;
|
||||
std::set<Address> outstanding_votes;
|
||||
};
|
||||
|
||||
struct Follower {
|
||||
Time last_received_append_entries_timestamp;
|
||||
Address leader_address;
|
||||
};
|
||||
|
||||
using Role = std::variant<Candidate, Leader, Follower>;
|
||||
@ -154,6 +152,17 @@ class Server {
|
||||
}
|
||||
}
|
||||
|
||||
/// Periodic protocol maintenance.
|
||||
void Cron() {
|
||||
// dispatch periodic logic based on our role to a specific Cron method.
|
||||
std::optional<Role> new_role = std::visit([&](auto &&role) { return Cron(role); }, role_);
|
||||
|
||||
if (new_role) {
|
||||
Log("becoming new role");
|
||||
role_ = std::move(new_role).value();
|
||||
}
|
||||
}
|
||||
|
||||
// Candidates keep sending Vote to peers until:
|
||||
// 1. receiving Append with a higher term (become Follower)
|
||||
// 2. receiving Vote with a higher term (become a Follower)
|
||||
@ -181,7 +190,7 @@ class Server {
|
||||
}
|
||||
|
||||
return Candidate{
|
||||
.successful_votes = std::vector<std::pair<Address, LogIndex>>(),
|
||||
.successful_votes = std::map<Address, LogIndex>(),
|
||||
.election_began = now,
|
||||
.outstanding_votes = outstanding_votes,
|
||||
};
|
||||
@ -240,7 +249,7 @@ class Server {
|
||||
// all roles can receive Vote and possibly become a follower
|
||||
template <typename AllRoles>
|
||||
std::optional<Role> Handle(AllRoles &, VoteRequest &&req, RequestId request_id, Address from_address) {
|
||||
Log("RECEIVED Vote :)");
|
||||
Log("received Vote");
|
||||
bool last_log_term_dominates = req.last_log_term >= LastLogTerm();
|
||||
bool term_dominates = req.term > state_.term;
|
||||
bool last_log_index_dominates = req.last_log_index >= LastLogIndex();
|
||||
@ -257,16 +266,16 @@ class Server {
|
||||
if (new_leader) {
|
||||
// become a follower
|
||||
state_.term = req.term;
|
||||
state_.voted_for = from_address;
|
||||
return Follower{
|
||||
.last_received_append_entries_timestamp = io_.Now(),
|
||||
.leader_address = from_address,
|
||||
};
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<Role> Handle(Candidate &candidate, VoteResponse &&res, RequestId request_id, Address from_address) {
|
||||
std::optional<Role> Handle(Candidate &candidate, VoteResponse &&res, RequestId, Address from_address) {
|
||||
Log("Candidate received VoteResponse");
|
||||
|
||||
if (res.term != state_.term) {
|
||||
@ -278,38 +287,43 @@ class Server {
|
||||
}
|
||||
|
||||
if (res.vote_granted) {
|
||||
MG_ASSERT(candidate.outstanding_votes.contains(from_address), "Received unexpected VoteResponse!");
|
||||
/*
|
||||
peer_commit_indices.insert_or_assign(res_env.from_address, res_env.message.commit_index);
|
||||
successes++;
|
||||
if (successes > (peers_.size() / 2)) {
|
||||
success = true;
|
||||
break;
|
||||
MG_ASSERT(candidate.outstanding_votes.contains(from_address),
|
||||
"Received unexpected VoteResponse from server not present in Candidate's outstanding_votes!");
|
||||
candidate.outstanding_votes.erase(from_address);
|
||||
|
||||
MG_ASSERT(!candidate.successful_votes.contains(from_address),
|
||||
"Received unexpected VoteResponse from server already in Candidate's successful_votes!");
|
||||
candidate.successful_votes.insert({from_address, res.commit_index});
|
||||
|
||||
if (candidate.successful_votes.size() >= candidate.outstanding_votes.size()) {
|
||||
Log("becoming Leader");
|
||||
|
||||
std::map<Address, FollowerTracker> followers{};
|
||||
|
||||
for (const auto &[address, commit_index] : candidate.successful_votes) {
|
||||
FollowerTracker follower{
|
||||
.next_index = commit_index,
|
||||
.confirmed_contiguous_index = commit_index,
|
||||
};
|
||||
followers.insert({address, std::move(follower)});
|
||||
}
|
||||
for (const auto &address : candidate.outstanding_votes) {
|
||||
FollowerTracker follower{
|
||||
.next_index = state_.log.size(),
|
||||
.confirmed_contiguous_index = 0,
|
||||
};
|
||||
followers.insert({address, follower});
|
||||
}
|
||||
|
||||
return Leader{
|
||||
.followers = std::move(followers),
|
||||
};
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
if (success) {
|
||||
Log("ELECTED");
|
||||
return Leader{};
|
||||
} else {
|
||||
}
|
||||
*/
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Periodic protocol maintenance.
|
||||
void Cron() {
|
||||
// dispatch periodic logic based on our role to a specific Cron method.
|
||||
std::optional<Role> new_role = std::visit([&](auto &&role) { return Cron(role); }, role_);
|
||||
|
||||
if (new_role) {
|
||||
Log("becoming new role");
|
||||
role_ = std::move(new_role).value();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename AllRoles>
|
||||
std::optional<Role> Handle(AllRoles &, VoteResponse &&res, RequestId request_id, Address from_address) {
|
||||
Log("non-Candidate received VoteResponse");
|
||||
@ -318,7 +332,7 @@ class Server {
|
||||
|
||||
// only leaders actually handle replication requests from clients
|
||||
std::optional<Role> Handle(Leader &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
|
||||
Log("leader RECEIVED ReplicationRequest :)");
|
||||
Log("leader received ReplicationRequest");
|
||||
|
||||
// we are the leader. add item to log and send Append to peers
|
||||
state_.log.emplace_back(std::pair(state_.term, std::move(req.opaque_data)));
|
||||
@ -327,47 +341,53 @@ class Server {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// non-leaders respond to replication requests with a redirection to the leader
|
||||
// template<typename AllRoles>
|
||||
template <typename AllRoles>
|
||||
std::optional<Role> Handle(const AllRoles &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
|
||||
Log("all RECEIVED ReplicationRequest :)");
|
||||
std::optional<Role> Handle(Follower &follower, ReplicationRequest &&req, RequestId request_id, Address from_address) {
|
||||
auto res = ReplicationResponse{};
|
||||
|
||||
res.success = false;
|
||||
if (state_.voted_for) {
|
||||
Log("redirecting client to known leader with port ", state_.voted_for->last_known_port);
|
||||
res.retry_leader = *state_.voted_for;
|
||||
}
|
||||
Log("redirecting client to known leader with port ", follower.leader_address.last_known_port);
|
||||
res.retry_leader = follower.leader_address;
|
||||
|
||||
io_.Send(from_address, request_id, res);
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// anyone can receive an AppendRequest and potentially be flipped to a follower
|
||||
// state.
|
||||
template <typename AllRoles>
|
||||
std::optional<Role> Handle(AllRoles &, AppendRequest &&aer, RequestId request_id, Address from_address) {
|
||||
Log("RECEIVED Append from a leader");
|
||||
bool error = false;
|
||||
std::optional<Role> Handle(Candidate &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
|
||||
Log("candidate received ReplicationRequest - not redirecting because no leader is known");
|
||||
auto res = ReplicationResponse{};
|
||||
|
||||
res.success = false;
|
||||
|
||||
io_.Send(from_address, request_id, res);
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<Role> Handle(Follower &follower, AppendRequest &&aer, RequestId request_id, Address from_address) {
|
||||
if (follower.leader_address != from_address) {
|
||||
} else if (aer.term != state_.term) {
|
||||
} else {
|
||||
follower.last_received_append_entries_timestamp = io_.Now();
|
||||
}
|
||||
|
||||
/*
|
||||
if (from_address != state_.voted_for) {
|
||||
Log("req.from_address is not who we voted for");
|
||||
error |= true;
|
||||
} else if (aer.term != state_.term) {
|
||||
Log("req.term differs from our current leader term");
|
||||
error |= true;
|
||||
} else if (aer.prev_log_index > state_.log.size()) {
|
||||
Log("req.prev_log_index is above our last applied log index");
|
||||
} else if (aer.last_log_index > state_.log.size()) {
|
||||
Log("req.last_log_index is above our last applied log index");
|
||||
// TODO: buffer this and apply it later rather than having to wait for
|
||||
// the leader to double-send future segments to us.
|
||||
error |= true;
|
||||
} else {
|
||||
auto [prev_log_term, data] = state_.log.at(aer.prev_log_index);
|
||||
auto [last_log_term, data] = state_.log.at(aer.last_log_index);
|
||||
|
||||
if (aer.prev_log_term != prev_log_term) {
|
||||
Log("req.prev_log_term differs from our leader term at that slot");
|
||||
if (aer.last_log_term != last_log_term) {
|
||||
Log("req.last_log_term differs from our leader term at that slot");
|
||||
error |= true;
|
||||
}
|
||||
}
|
||||
@ -380,22 +400,49 @@ class Server {
|
||||
// things with different terms (we got data that
|
||||
// hasn't reached consensus yet, which is normal)
|
||||
// MG_ASSERT(req.last_log_index > state_.commit_index);
|
||||
state_.log.resize(aer.prev_log_index);
|
||||
state_.log.resize(aer.last_log_index);
|
||||
|
||||
state_.log.insert(state_.log.end(), aer.entries.begin(), aer.entries.end());
|
||||
|
||||
state_.commit_index = std::min(aer.leader_commit, state_.log.size());
|
||||
}
|
||||
*/
|
||||
|
||||
auto res = AppendResponse{
|
||||
.success = !error,
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// anyone can receive an AppendRequest and potentially be flipped to a follower
|
||||
// state.
|
||||
template <typename AllRoles>
|
||||
std::optional<Role> Handle(AllRoles &, AppendRequest &&req, RequestId request_id, Address from_address) {
|
||||
Log("non-Follower received Append from a Leader");
|
||||
|
||||
bool error = true;
|
||||
|
||||
std::optional<Role> ret = std::nullopt;
|
||||
|
||||
if (req.term > state_.term) {
|
||||
// become follower of this leader, reply with our log status
|
||||
|
||||
error = false;
|
||||
|
||||
state_.term = req.term;
|
||||
|
||||
ret = Follower{
|
||||
.last_received_append_entries_timestamp = io_.Now(),
|
||||
.leader_address = from_address,
|
||||
};
|
||||
}
|
||||
|
||||
AppendResponse res{
|
||||
.success = error,
|
||||
.last_log_term = state_.term,
|
||||
.last_log_index = state_.log.size(),
|
||||
};
|
||||
|
||||
io_.Send(from_address, request_id, res);
|
||||
|
||||
return std::nullopt;
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename... Ts>
|
||||
@ -446,13 +493,13 @@ int main() {
|
||||
ReplicationRequest cli_req;
|
||||
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
|
||||
|
||||
auto response_future = cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(srv_addr_1, cli_req, 100);
|
||||
ResponseFuture<ReplicationResponse> response_future =
|
||||
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(srv_addr_1, cli_req, 100);
|
||||
|
||||
// receive response
|
||||
auto response_result = response_future.Wait();
|
||||
auto response_envelope = response_result.GetValue();
|
||||
|
||||
auto response = std::any_cast<ReplicationResponse>(response_envelope.message);
|
||||
ResponseResult<ReplicationResponse> response_result = response_future.Wait();
|
||||
ResponseEnvelope<ReplicationResponse> response_envelope = response_result.GetValue();
|
||||
ReplicationResponse response = response_envelope.message;
|
||||
|
||||
MG_ASSERT(response.success);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user