Fix several bugs, clean-ups and tuning

This commit is contained in:
Tyler Neely 2022-07-19 14:57:01 +00:00
parent 5dd0ddc352
commit 689336e765
2 changed files with 61 additions and 44 deletions
src/io/v3
tests/simulation

View File

@ -275,7 +275,8 @@ class SimulatorHandle {
return false;
}
// clock ticks forwards by this many microseconds on average
// We tick the clock forward when all servers are blocked but
// there are no in-flight messages to schedule delivery of.
std::poisson_distribution<> time_distrib(100);
uint64_t clock_advance = time_distrib(rng_);
cluster_wide_time_microseconds_ += clock_advance;

View File

@ -161,7 +161,7 @@ class Server {
// 5 -> 3 (index 2)
state_.committed_log_size = indices[(indices.size() / 2)];
Log("leader committed_log_size is now ", state_.committed_log_size);
Log("Leader committed_log_size is now ", state_.committed_log_size);
while (!leader.pending_client_requests.empty()) {
auto &front = leader.pending_client_requests.front();
@ -180,7 +180,7 @@ class Server {
}
void BroadcastAppendEntries(std::map<Address, FollowerTracker> &followers) {
Log("leader broadcasting, total log size is ", state_.log.size());
Log("Leader broadcasting, total log size is ", state_.log.size());
for (auto &[address, follower] : followers) {
LogIndex index = follower.confirmed_contiguous_index;
@ -188,10 +188,12 @@ class Server {
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
Term previous_term_from_index = PreviousTermFromIndex(index);
Log("previous term from index ", index, " is ", previous_term_from_index);
AppendRequest ar{
.term = state_.term,
.last_log_index = index,
.last_log_term = PreviousTermFromIndex(index),
.last_log_term = previous_term_from_index,
.entries = entries,
.leader_commit = state_.committed_log_size,
};
@ -203,13 +205,15 @@ class Server {
}
}
void TimeOutOldClientRequests() { Duration client_request_timeout = RandomTimeout(10000, 150000); }
Duration RandomTimeout(Duration min, Duration max) {
std::uniform_int_distribution<> time_distrib(min, max);
return io_.Rand(time_distrib);
}
Term PreviousTermFromIndex(LogIndex index) {
if (state_.log.size() <= index + 1) {
if (index == 0 || state_.log.size() + 1 <= index) {
return 0;
} else {
auto &[term, data] = state_.log.at(index - 1);
@ -261,6 +265,8 @@ class Server {
if (now - candidate.election_began > election_timeout) {
state_.term++;
Log("becoming Candidate for term ", (int)state_.term);
VoteRequest request{
.term = state_.term,
.last_log_index = LastLogIndex(),
@ -276,7 +282,6 @@ class Server {
outstanding_votes.insert(peer);
}
Log("becoming Candidate for term ", (int)state_.term);
return Candidate{
.successful_votes = std::map<Address, LogIndex>(),
.election_began = now,
@ -344,6 +349,11 @@ class Server {
bool last_log_index_dominates = req.last_log_index >= LastLogIndex();
bool new_leader = last_log_term_dominates && term_dominates && last_log_index_dominates;
if (new_leader) {
MG_ASSERT(req.term > state_.term);
MG_ASSERT(std::max(req.term, state_.term) == req.term);
}
VoteResponse res{
.term = std::max(req.term, state_.term),
.committed_log_size = state_.committed_log_size,
@ -367,49 +377,46 @@ class Server {
std::optional<Role> Handle(Candidate &candidate, VoteResponse &&res, RequestId, Address from_address) {
Log("Candidate received VoteResponse");
if (res.term != state_.term) {
MG_ASSERT(res.term < state_.term, "Somehow received a VoteResponse from the future!");
if (!res.vote_granted || res.term != state_.term) {
Log("received unsuccessful VoteResponse from term ", res.term, " when our candidacy term is ", state_.term);
// we received a delayed VoteResponse from the past, which has to do with an election that is
// no longer valid. We can simply drop this.
Log("received VoteResponse from old term ", res.term, " but our candidacy term is ", state_.term);
return std::nullopt;
}
if (res.vote_granted) {
MG_ASSERT(candidate.outstanding_votes.contains(from_address),
"Received unexpected VoteResponse from server not present in Candidate's outstanding_votes!");
candidate.outstanding_votes.erase(from_address);
MG_ASSERT(candidate.outstanding_votes.contains(from_address),
"Received unexpected VoteResponse from server not present in Candidate's outstanding_votes!");
candidate.outstanding_votes.erase(from_address);
MG_ASSERT(!candidate.successful_votes.contains(from_address),
"Received unexpected VoteResponse from server already in Candidate's successful_votes!");
candidate.successful_votes.insert({from_address, res.committed_log_size});
MG_ASSERT(!candidate.successful_votes.contains(from_address),
"Received unexpected VoteResponse from server already in Candidate's successful_votes!");
candidate.successful_votes.insert({from_address, res.committed_log_size});
if (candidate.successful_votes.size() >= candidate.outstanding_votes.size()) {
std::map<Address, FollowerTracker> followers{};
if (candidate.successful_votes.size() >= candidate.outstanding_votes.size()) {
std::map<Address, FollowerTracker> followers{};
for (const auto &[address, committed_log_size] : candidate.successful_votes) {
FollowerTracker follower{
.next_index = committed_log_size,
.confirmed_contiguous_index = committed_log_size,
};
followers.insert({address, std::move(follower)});
}
for (const auto &address : candidate.outstanding_votes) {
FollowerTracker follower{
.next_index = state_.log.size(),
.confirmed_contiguous_index = 0,
};
followers.insert({address, follower});
}
BroadcastAppendEntries(followers);
Log("becoming Leader at term ", (int)state_.term);
return Leader{
.followers = std::move(followers),
.pending_client_requests = std::deque<PendingClientRequest>(),
for (const auto &[address, committed_log_size] : candidate.successful_votes) {
FollowerTracker follower{
.next_index = committed_log_size,
.confirmed_contiguous_index = committed_log_size,
};
followers.insert({address, std::move(follower)});
}
for (const auto &address : candidate.outstanding_votes) {
FollowerTracker follower{
.next_index = state_.log.size(),
.confirmed_contiguous_index = 0,
};
followers.insert({address, std::move(follower)});
}
BroadcastAppendEntries(followers);
Log("becoming Leader at term ", (int)state_.term);
return Leader{
.followers = std::move(followers),
.pending_client_requests = std::deque<PendingClientRequest>(),
};
}
return std::nullopt;
@ -423,7 +430,7 @@ class Server {
// only leaders actually handle replication requests from clients
std::optional<Role> Handle(Leader &leader, ReplicationRequest &&req, RequestId request_id, Address from_address) {
Log("leader received ReplicationRequest");
Log("Leader received ReplicationRequest");
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.opaque_data)));
@ -446,7 +453,7 @@ class Server {
auto res = ReplicationResponse{};
res.success = false;
Log("redirecting client to known leader with port ", follower.leader_address.last_known_port);
Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port);
res.retry_leader = follower.leader_address;
io_.Send(from_address, request_id, res);
@ -455,7 +462,7 @@ class Server {
}
std::optional<Role> Handle(Candidate &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
Log("candidate received ReplicationRequest - not redirecting because no leader is known");
Log("Candidate received ReplicationRequest - not redirecting because no Leader is known");
auto res = ReplicationResponse{};
res.success = false;
@ -553,7 +560,8 @@ class Server {
template <typename... Ts>
void Log(Ts &&...args) {
std::cout << "raft server " << (int)io_.GetAddress().last_known_port << " ";
auto now = io_.Now();
std::cout << (int)now << " raft server " << (int)io_.GetAddress().last_known_port << " ";
(std::cout << ... << args) << std::endl;
}
};
@ -600,6 +608,8 @@ void RunSimulation() {
auto srv_thread_3 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_3));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_3);
std::cout << "beginning test after servers have become quiescent" << std::endl;
bool success = false;
Address leader = srv_addr_1;
for (int retries = 0; retries < 30; retries++) {
@ -608,10 +618,16 @@ void RunSimulation() {
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
ResponseFuture<ReplicationResponse> response_future =
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(leader, cli_req, 100);
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(leader, cli_req, 5000);
// receive response
ResponseResult<ReplicationResponse> response_result = response_future.Wait();
if (response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
continue;
}
ResponseEnvelope<ReplicationResponse> response_envelope = response_result.GetValue();
ReplicationResponse response = response_envelope.message;