A large batch of bug fixes and clean-ups

This commit is contained in:
Tyler Neely 2022-07-20 13:18:00 +00:00
parent 689336e765
commit afef6dc11b
3 changed files with 112 additions and 42 deletions

View File

@ -16,4 +16,6 @@ struct SimulatorConfig {
bool perform_timeouts = false;
bool scramble_messages = true;
uint64_t rng_seed = 0;
uint64_t start_time = 0;
uint64_t abort_time = ULLONG_MAX;
};

View File

@ -213,7 +213,7 @@ class SimulatorHandle {
// messages that are sent to servers that may later receive them
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
uint64_t cluster_wide_time_microseconds_ = 1000000; // it's one million (microseconds) o'clock!
uint64_t cluster_wide_time_microseconds_;
bool should_shut_down_ = false;
SimulatorStats stats_;
size_t blocked_on_receive_ = 0;
@ -222,7 +222,7 @@ class SimulatorHandle {
std::mt19937 rng_{};
public:
SimulatorHandle(SimulatorConfig config) : config_(config) {}
SimulatorHandle(SimulatorConfig config) : cluster_wide_time_microseconds_(config.start_time), config_(config) {}
void IncrementServerCountAndWaitForQuiescentState(Address address) {
std::unique_lock<std::mutex> lock(mu_);
@ -275,17 +275,21 @@ class SimulatorHandle {
return false;
}
// We tick the clock forward when all servers are blocked but
// there are no in-flight messages to schedule delivery of.
std::poisson_distribution<> time_distrib(100);
uint64_t clock_advance = time_distrib(rng_);
cluster_wide_time_microseconds_ += clock_advance;
cv_.notify_all();
if (in_flight_.empty()) {
// return early here because there are no messages to schedule
return false;
// We tick the clock forward when all servers are blocked but
// there are no in-flight messages to schedule delivery of.
std::poisson_distribution<> time_distrib(50);
uint64_t clock_advance = time_distrib(rng_);
cluster_wide_time_microseconds_ += clock_advance;
MG_ASSERT(cluster_wide_time_microseconds_ < config_.abort_time,
"Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
"in an expected amount of time.");
return true;
}
if (config_.scramble_messages) {
@ -360,6 +364,8 @@ class SimulatorHandle {
stats_.total_messages_++;
stats_.total_requests_++;
cv_.notify_all();
return;
}
@ -403,6 +409,10 @@ class SimulatorHandle {
std::any message_any(std::move(message));
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
stats_.total_messages_++;
cv_.notify_all();
}
uint64_t Now() {

View File

@ -96,17 +96,24 @@ struct PendingClientRequest {
struct Leader {
std::map<Address, FollowerTracker> followers;
std::deque<PendingClientRequest> pending_client_requests;
Time last_broadcast = 0;
void Print() { std::cout << "\tLeader \t"; }
};
struct Candidate {
std::map<Address, LogIndex> successful_votes;
Time election_began;
Time election_began = 0;
std::set<Address> outstanding_votes;
void Print() { std::cout << "\tCandidate\t"; }
};
struct Follower {
Time last_received_append_entries_timestamp;
Address leader_address;
void Print() { std::cout << "\tFollower \t"; }
};
using Role = std::variant<Candidate, Leader, Follower>;
@ -126,7 +133,7 @@ class Server {
while (!io_.ShouldShutDown()) {
auto now = io_.Now();
Duration random_cron_interval = RandomTimeout(500, 2000);
Duration random_cron_interval = RandomTimeout(1000, 2000);
if (now - last_cron > random_cron_interval) {
Cron();
last_cron = now;
@ -153,12 +160,11 @@ class Server {
auto indices = std::vector<LogIndex>{state_.log.size()};
for (const auto &[addr, f] : leader.followers) {
indices.push_back(f.confirmed_contiguous_index);
Log("Follower at port ", (int)addr.last_known_port,
" has confirmed contiguous index of: ", f.confirmed_contiguous_index);
}
std::ranges::sort(indices, std::ranges::greater());
// assuming reverse sort (using std::ranges::greater)
// 3 -> 2 (index 1)
// 4 -> 3 (index 2)
// 5 -> 3 (index 2)
state_.committed_log_size = indices[(indices.size() / 2)];
Log("Leader committed_log_size is now ", state_.committed_log_size);
@ -180,16 +186,20 @@ class Server {
}
void BroadcastAppendEntries(std::map<Address, FollowerTracker> &followers) {
Log("Leader broadcasting, total log size is ", state_.log.size());
for (auto &[address, follower] : followers) {
LogIndex index = follower.confirmed_contiguous_index;
std::vector<std::pair<Term, Op>> entries;
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
if (state_.log.size() > index) {
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
}
Term previous_term_from_index = PreviousTermFromIndex(index);
Log("previous term from index ", index, " is ", previous_term_from_index);
Log("Leader sending ", entries.size(), " entries to Follower ", address.last_known_port,
" which are above its known index of ", index);
AppendRequest ar{
.term = state_.term,
.last_log_index = index,
@ -205,8 +215,6 @@ class Server {
}
}
void TimeOutOldClientRequests() { Duration client_request_timeout = RandomTimeout(10000, 150000); }
Duration RandomTimeout(Duration min, Duration max) {
std::uniform_int_distribution<> time_distrib(min, max);
return io_.Rand(time_distrib);
@ -246,7 +254,6 @@ class Server {
/// Periodic protocol maintenance.
void Cron() {
Log("running Cron");
// dispatch periodic logic based on our role to a specific Cron method.
std::optional<Role> new_role = std::visit([&](auto &&role) { return Cron(role); }, role_);
@ -261,11 +268,12 @@ class Server {
// 3. receiving a quorum of responses to our last batch of Vote (become a Leader)
std::optional<Role> Cron(Candidate &candidate) {
auto now = io_.Now();
Duration election_timeout = RandomTimeout(100000, 150000);
Duration election_timeout = RandomTimeout(100000, 200000);
if (now - candidate.election_began > election_timeout) {
state_.term++;
Log("becoming Candidate for term ", (int)state_.term);
Log("becoming Candidate for term ", (int)state_.term, " after leader timeout of ", (int)election_timeout,
" elapsed since last election attempt");
VoteRequest request{
.term = state_.term,
@ -296,7 +304,7 @@ class Server {
std::optional<Role> Cron(Follower &follower) {
auto now = io_.Now();
auto time_since_last_append_entries = now - follower.last_received_append_entries_timestamp;
Duration election_timeout = RandomTimeout(100000, 150000);
Duration election_timeout = RandomTimeout(100000, 200000);
// randomized follower timeout with a range of 100-150ms.
if (time_since_last_append_entries > election_timeout) {
@ -309,8 +317,14 @@ class Server {
// Leaders (re)send AppendRequest to followers.
std::optional<Role> Cron(Leader &leader) {
// TODO time-out client requests if we haven't made progress after some threshold
BroadcastAppendEntries(leader.followers);
Time now = io_.Now();
Duration broadcast_timeout = RandomTimeout(20000, 30000);
if (now - leader.last_broadcast > broadcast_timeout) {
BroadcastAppendEntries(leader.followers);
leader.last_broadcast = now;
}
// TODO TimeOutOldClientRequests();
return std::nullopt;
}
@ -343,7 +357,7 @@ class Server {
// all roles can receive Vote and possibly become a follower
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, VoteRequest &&req, RequestId request_id, Address from_address) {
Log("received Vote");
Log("received Vote from ", (int)from_address.last_known_port, " with term ", req.term);
bool last_log_term_dominates = req.last_log_term >= LastLogTerm();
bool term_dominates = req.term > state_.term;
bool last_log_index_dominates = req.last_log_index >= LastLogIndex();
@ -410,9 +424,10 @@ class Server {
followers.insert({address, std::move(follower)});
}
Log("becoming Leader at term ", (int)state_.term);
BroadcastAppendEntries(followers);
Log("becoming Leader at term ", (int)state_.term);
return Leader{
.followers = std::move(followers),
.pending_client_requests = std::deque<PendingClientRequest>(),
@ -483,16 +498,24 @@ class Server {
.last_log_index = CommittedLogIndex(),
};
if constexpr (std::is_same<AllRoles, Leader>()) {
MG_ASSERT(req.term != state_.term, "Multiple leaders are acting under the term ", req.term);
}
bool is_candidate = std::is_same<AllRoles, Candidate>();
bool is_failed_competitor = is_candidate && req.term == state_.term;
Time now = io_.Now();
// Handle early-exit conditions.
if (req.term > state_.term) {
if (req.term > state_.term || is_failed_competitor) {
// become follower of this leader, reply with our log status
state_.term = req.term;
io_.Send(from_address, request_id, res);
Log("becoming Follower");
Log("becoming Follower of Leader ", (int)from_address.last_known_port, " at term ", (int)req.term);
return Follower{
.last_received_append_entries_timestamp = io_.Now(),
.last_received_append_entries_timestamp = now,
.leader_address = from_address,
};
} else if (req.term < state_.term) {
@ -502,12 +525,22 @@ class Server {
return std::nullopt;
};
// at this point, we're dealing with our own leader
if constexpr (std::is_same<AllRoles, Follower>()) {
// small specialization for when we're already a Follower
MG_ASSERT(role.leader_address == from_address, "Multiple Leaders are acting under the same term number!");
role.last_received_append_entries_timestamp = io_.Now();
role.last_received_append_entries_timestamp = now;
} else {
Log("Somehow entered Follower-specific logic as a non-Follower");
MG_ASSERT(false, "Somehow entered Follower-specific logic as a non-Follower");
}
res.last_log_term = LastLogTerm();
res.last_log_index = LastLogIndex();
Log("returning last_log_index of ", res.last_log_index);
// Handle steady-state conditions.
if (req.last_log_index != LastLogIndex()) {
Log("req.last_log_index is above our last applied log index");
@ -515,9 +548,12 @@ class Server {
Log("req.last_log_term differs from our leader term at that slot, expected: ", LastLogTerm(), " but got ",
req.last_log_term);
} else {
// happy path
// happy path - apply log
Log("Follower applying batch of entries to log of size ", req.entries.size());
MG_ASSERT(req.last_log_index >= state_.committed_log_size,
"Applied history from Leader which goes back in time from our commit_index");
// possibly chop-off stuff that was replaced by
// things with different terms (we got data that
// hasn't reached consensus yet, which is normal)
@ -528,8 +564,6 @@ class Server {
state_.committed_log_size = std::min(req.leader_commit, LastLogIndex());
res.success = true;
res.last_log_term = LastLogTerm();
res.last_log_index = LastLogIndex();
}
io_.Send(from_address, request_id, res);
@ -540,9 +574,16 @@ class Server {
std::optional<Role> Handle(Leader &leader, AppendResponse &&res, RequestId request_id, Address from_address) {
if (res.term != state_.term) {
} else if (!leader.followers.contains(from_address)) {
} else if (!res.success) {
Log("received AppendResponse from unknown Follower");
MG_ASSERT(false, "received AppendResponse from unknown Follower");
} else {
Log("Leader got successful AppendResponse");
if (res.success) {
Log("Leader got successful AppendResponse from ", (int)from_address.last_known_port, " with last_log_index of ",
(int)res.last_log_index);
} else {
Log("Leader got unsuccessful AppendResponse from ", (int)from_address.last_known_port,
" with last_log_index of ", (int)res.last_log_index);
}
FollowerTracker &follower = leader.followers.at(from_address);
follower.next_index = std::max(follower.next_index, res.last_log_index);
follower.confirmed_contiguous_index = std::max(follower.confirmed_contiguous_index, res.last_log_index);
@ -560,8 +601,13 @@ class Server {
template <typename... Ts>
void Log(Ts &&...args) {
auto now = io_.Now();
std::cout << (int)now << " raft server " << (int)io_.GetAddress().last_known_port << " ";
Time now = io_.Now();
Term term = state_.term;
std::cout << '\t' << (int)now << "\t" << (int)term << "\t" << (int)io_.GetAddress().last_known_port;
std::visit([&](auto &&role) { role.Print(); }, role_);
(std::cout << ... << args) << std::endl;
}
};
@ -577,6 +623,8 @@ void RunSimulation() {
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
.start_time = 200000,
.abort_time = 1 * 1024 * 1024,
};
auto simulator = Simulator(config);
@ -610,15 +658,19 @@ void RunSimulation() {
std::cout << "beginning test after servers have become quiescent" << std::endl;
std::mt19937 cli_rng_{};
Address server_addrs[]{srv_addr_1, srv_addr_2, srv_addr_3};
bool success = false;
Address leader = srv_addr_1;
for (int retries = 0; retries < 30; retries++) {
Address leader = server_addrs[0];
for (int retries = 0; retries < 100; retries++) {
// send request
ReplicationRequest cli_req;
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
std::cout << "client sending ReplicationRequest to Leader " << (int)leader.last_known_port << std::endl;
ResponseFuture<ReplicationResponse> response_future =
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(leader, cli_req, 5000);
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(leader, cli_req, 50000);
// receive response
ResponseResult<ReplicationResponse> response_result = response_future.Wait();
@ -640,7 +692,12 @@ void RunSimulation() {
leader = response.retry_leader.value();
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
} else {
std::cout << "client NOT redirected to leader server " << std::endl;
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
size_t addr_index = addr_distrib(cli_rng_);
leader = server_addrs[addr_index];
std::cout << "client NOT redirected to leader server, trying a random one at index " << (int)addr_index
<< " with port " << (int)leader.last_known_port << std::endl;
}
}
@ -662,6 +719,7 @@ int main() {
for (int i = 0; i < n_tests; i++) {
std::cout << "========================== NEW SIMULATION " << i << " ==========================" << std::endl;
std::cout << "\tTime\tTerm\tPort\tRole\t\tMessage\n";
RunSimulation();
}