Start cranking up the testing intensity and fix a variety of correctness and code quality issues

This commit is contained in:
Tyler Neely 2022-07-15 16:19:00 +00:00
parent ad1d8637e5
commit 0dc69c180d
4 changed files with 131 additions and 101 deletions

View File

@ -21,9 +21,13 @@
#include "transport.hpp"
class SimulatorTransport {
std::shared_ptr<SimulatorHandle> simulator_handle_;
Address address_;
std::mt19937 rng_{};
public:
SimulatorTransport(std::shared_ptr<SimulatorHandle> simulator_handle, Address address)
: simulator_handle_(simulator_handle), address_(address) {}
SimulatorTransport(std::shared_ptr<SimulatorHandle> simulator_handle, Address address, uint64_t seed)
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
template <Message Request, Message Response>
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request,
@ -53,27 +57,27 @@ class SimulatorTransport {
template <class D = std::poisson_distribution<>, class Return = uint64_t>
Return Rand(D distrib) {
return simulator_handle_->Rand<D, Return>(distrib);
return distrib(rng_);
}
private:
std::shared_ptr<SimulatorHandle> simulator_handle_;
Address address_;
};
class Simulator {
std::mt19937 rng_{};
std::shared_ptr<SimulatorHandle> simulator_handle_;
public:
Simulator(SimulatorConfig config) { simulator_handle_->SetConfig(config); }
Simulator(SimulatorConfig config)
: rng_(std::mt19937{config.rng_seed}), simulator_handle_{std::make_shared<SimulatorHandle>(config)} {}
void ShutDown() { simulator_handle_->ShutDown(); }
Io<SimulatorTransport> Register(Address address, bool is_server) {
if (is_server) {
simulator_handle_->IncrementServerCount(address);
}
return Io(SimulatorTransport(simulator_handle_, address), address);
Io<SimulatorTransport> Register(Address address) {
std::uniform_int_distribution<uint64_t> seed_distrib{};
uint64_t seed = seed_distrib(rng_);
return Io(SimulatorTransport(simulator_handle_, address, seed), address);
}
private:
std::shared_ptr<SimulatorHandle> simulator_handle_{std::make_shared<SimulatorHandle>()};
void IncrementServerCountAndWaitForQuiescentState(Address address) {
simulator_handle_->IncrementServerCountAndWaitForQuiescentState(address);
}
};

View File

@ -101,6 +101,13 @@ struct PromiseKey {
};
class OpaquePromise {
const std::type_info *ti_;
void *ptr_;
std::function<void(void *)> dtor_;
std::function<bool(void *)> is_awaited_;
std::function<void(void *, OpaqueMessage)> fill_;
std::function<void(void *)> time_out_;
public:
OpaquePromise(OpaquePromise &&old)
: ti_(old.ti_),
@ -183,14 +190,6 @@ class OpaquePromise {
dtor_(ptr_);
}
}
private:
const std::type_info *ti_;
void *ptr_;
std::function<void(void *)> dtor_;
std::function<bool(void *)> is_awaited_;
std::function<void(void *, OpaqueMessage)> fill_;
std::function<void(void *)> time_out_;
};
struct DeadlineAndOpaquePromise {
@ -199,17 +198,56 @@ struct DeadlineAndOpaquePromise {
};
class SimulatorHandle {
std::mutex mu_{};
std::condition_variable cv_;
// messages that have not yet been scheduled or dropped
std::vector<std::pair<Address, OpaqueMessage>> in_flight_;
// the responsese to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
uint64_t cluster_wide_time_microseconds_ = 1000000; // it's one million (microseconds) o'clock!
bool should_shut_down_ = false;
SimulatorStats stats_;
size_t blocked_on_receive_ = 0;
std::set<Address> server_addresses_;
SimulatorConfig config_;
std::mt19937 rng_{};
public:
void IncrementServerCount(Address address) {
SimulatorHandle(SimulatorConfig config) : config_(config) {}
void IncrementServerCountAndWaitForQuiescentState(Address address) {
std::unique_lock<std::mutex> lock(mu_);
servers_++;
server_addresses_.insert(address);
while (true) {
size_t blocked_servers = blocked_on_receive_;
for (auto &[promise_key, opaque_promise] : promises_) {
if (opaque_promise.promise.IsAwaited()) {
if (server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
}
}
bool all_servers_blocked = blocked_servers < server_addresses_.size();
if (all_servers_blocked) {
return;
}
cv_.wait(lock);
}
}
bool MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_);
// TODO nuke this notify
cv_.notify_all();
size_t blocked_servers = blocked_on_receive_;
@ -221,13 +259,7 @@ class SimulatorHandle {
}
}
// TODO this is not deterministic time advancement
// clock ticks forwards by this many microseconds on average
std::poisson_distribution<> time_distrib(100);
uint64_t clock_advance = time_distrib(rng_);
cluster_wide_time_microseconds_ += clock_advance;
if (blocked_servers < servers_) {
if (blocked_servers < server_addresses_.size()) {
// we only need to advance the simulator when all
// servers have reached a quiescent state, blocked
// on their own futures or receive methods.
@ -240,6 +272,12 @@ class SimulatorHandle {
return false;
}
// TODO this is not deterministic time advancement
// clock ticks forwards by this many microseconds on average
std::poisson_distribution<> time_distrib(100);
uint64_t clock_advance = time_distrib(rng_);
cluster_wide_time_microseconds_ += clock_advance;
// std::cout << "looking at message in tick" << std::endl;
if (config_.scramble_messages) {
@ -285,15 +323,9 @@ class SimulatorHandle {
return true;
}
void SetConfig(SimulatorConfig config) {
std::unique_lock<std::mutex> lock(mu_);
rng_ = std::mt19937{config.rng_seed};
config_ = config;
cv_.notify_all();
}
void ShutDown() {
std::unique_lock<std::mutex> lock(mu_);
std::cout << "Shutting down" << std::endl;
should_shut_down_ = true;
cv_.notify_all();
}
@ -350,6 +382,7 @@ class SimulatorHandle {
bool made_progress = MaybeTickSimulator();
lock.lock();
if (!should_shut_down_ && !made_progress) {
std::cout << "waiting on cv" << std::endl;
cv_.wait(lock);
}
blocked_on_receive_ -= 1;
@ -376,26 +409,4 @@ class SimulatorHandle {
std::unique_lock<std::mutex> lock(mu_);
return distrib(rng_);
}
private:
std::mutex mu_{};
std::condition_variable cv_;
// messages that have not yet been scheduled or dropped
std::vector<std::pair<Address, OpaqueMessage>> in_flight_;
// the responsese to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
// messages that are sent to servers that may later receive them
std::map<Address, std::vector<OpaqueMessage>> can_receive_;
uint64_t cluster_wide_time_microseconds_ = 1000000; // it's one million (microseconds) o'clock!
bool should_shut_down_ = false;
SimulatorStats stats_;
size_t blocked_on_receive_ = 0;
size_t servers_ = 0;
std::set<Address> server_addresses_;
SimulatorConfig config_;
std::mt19937 rng_{};
};

View File

@ -75,6 +75,11 @@ using RequestResult = BasicResult<TimedOut, RequestEnvelope<Ms...>>;
template <typename I>
class Io {
I implementation_;
Address address_;
uint64_t request_id_counter_ = 0;
uint64_t default_timeout_microseconds_ = 50 * 1000;
public:
Io(I io, Address address) : implementation_(io), address_(address) {}
@ -137,10 +142,4 @@ class Io {
}
Address GetAddress() { return address_; }
private:
I implementation_;
Address address_;
uint64_t request_id_counter_ = 0;
uint64_t default_timeout_microseconds_ = 50 * 1000;
};

View File

@ -13,6 +13,8 @@
// TODO(tyler) buffer out-of-order Append buffers to reassemble more quickly
// TODO(tyler) handle granular batch sizes based on simple flow control
// TODO(tyler) add "application" test that asserts that all state machines apply the same items in-order
// TODO(tyler) fix disparity between 1-based indexing in raft paper and log's index
// TODO(tyler) make rng thread-local to facilitate determinism despite non-deterministic mutex races
#include <chrono>
#include <deque>
@ -44,10 +46,10 @@ struct ReplicationResponse {
struct AppendRequest {
Term term;
Term last_log_index;
LogIndex last_log_index;
Term last_log_term;
std::vector<std::pair<Term, Op>> entries;
Term leader_commit;
LogIndex leader_commit;
};
struct AppendResponse {
@ -58,7 +60,7 @@ struct AppendResponse {
// the leader the offset that we are interested in
// to send log offsets from for us. This will only
// be useful at the beginning of a leader's term.
Term last_log_index;
LogIndex last_log_index;
};
struct VoteRequest {
@ -69,14 +71,14 @@ struct VoteRequest {
struct VoteResponse {
Term term;
LogIndex commit_index;
LogIndex committed_log_size;
bool vote_granted;
};
struct CommonState {
Term term = 0;
std::vector<std::pair<Term, Op>> log;
LogIndex commit_index = 0;
LogIndex committed_log_size = 0;
LogIndex last_applied = 0;
};
@ -111,6 +113,11 @@ using Role = std::variant<Candidate, Leader, Follower>;
template <typename IoImpl>
class Server {
CommonState state_;
Role role_ = Candidate{};
Io<IoImpl> io_;
std::vector<Address> peers_;
public:
Server(Io<IoImpl> io, std::vector<Address> peers) : io_(io), peers_(peers) {}
@ -119,7 +126,7 @@ class Server {
while (!io_.ShouldShutDown()) {
auto now = io_.Now();
Duration random_cron_interval = RandomTimeout(5000, 30000);
Duration random_cron_interval = RandomTimeout(500, 2000);
if (now - last_cron > random_cron_interval) {
Cron();
last_cron = now;
@ -141,13 +148,8 @@ class Server {
}
private:
CommonState state_;
Role role_ = Candidate{};
Io<IoImpl> io_;
std::vector<Address> peers_;
void BumpCommitIndexAndReplyToClients(Leader &leader) {
// set the current commit_index based on the
// set the current committed_log_size based on the
auto indices = std::vector<LogIndex>{state_.log.size()};
for (const auto &[addr, f] : leader.followers) {
indices.push_back(f.confirmed_contiguous_index);
@ -157,13 +159,13 @@ class Server {
// 3 -> 2 (index 1)
// 4 -> 3 (index 2)
// 5 -> 3 (index 2)
state_.commit_index = indices[(indices.size() / 2)];
state_.committed_log_size = indices[(indices.size() / 2)];
Log("leader commit_index is now ", state_.commit_index);
Log("leader committed_log_size is now ", state_.committed_log_size);
while (!leader.pending_client_requests.empty()) {
auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.commit_index) {
if (front.log_index <= state_.committed_log_size) {
Log("Leader responding SUCCESS to client");
ReplicationResponse rr{
.success = true,
@ -191,7 +193,7 @@ class Server {
.last_log_index = index,
.last_log_term = PreviousTermFromIndex(index),
.entries = entries,
.leader_commit = state_.commit_index,
.leader_commit = state_.committed_log_size,
};
// request_id not necessary to set because it's not a Future-backed Request.
@ -215,13 +217,15 @@ class Server {
}
}
LogIndex CommittedLogIndex() { return state_.commit_index; }
LogIndex CommittedLogIndex() { return state_.committed_log_size; }
Term CommittedLogTerm() {
if (state_.log.empty()) {
MG_ASSERT(state_.log.size() >= state_.committed_log_size);
if (state_.log.empty() || state_.committed_log_size == 0) {
return 0;
} else {
auto &[term, data] = state_.log.at(state_.commit_index);
Log("log len: ", state_.log.size(), " committed_log_size: ", state_.committed_log_size);
auto &[term, data] = state_.log.at(state_.committed_log_size - 1);
return term;
}
}
@ -343,7 +347,7 @@ class Server {
VoteResponse res{
.term = std::max(req.term, state_.term),
.commit_index = state_.commit_index,
.committed_log_size = state_.committed_log_size,
.vote_granted = new_leader,
};
@ -379,15 +383,15 @@ class Server {
MG_ASSERT(!candidate.successful_votes.contains(from_address),
"Received unexpected VoteResponse from server already in Candidate's successful_votes!");
candidate.successful_votes.insert({from_address, res.commit_index});
candidate.successful_votes.insert({from_address, res.committed_log_size});
if (candidate.successful_votes.size() >= candidate.outstanding_votes.size()) {
std::map<Address, FollowerTracker> followers{};
for (const auto &[address, commit_index] : candidate.successful_votes) {
for (const auto &[address, committed_log_size] : candidate.successful_votes) {
FollowerTracker follower{
.next_index = commit_index,
.confirmed_contiguous_index = commit_index,
.next_index = committed_log_size,
.confirmed_contiguous_index = committed_log_size,
};
followers.insert({address, std::move(follower)});
}
@ -457,6 +461,8 @@ class Server {
res.success = false;
Cron();
io_.Send(from_address, request_id, res);
return std::nullopt;
@ -513,7 +519,7 @@ class Server {
state_.log.insert(state_.log.end(), req.entries.begin(), req.entries.end());
state_.commit_index = std::min(req.leader_commit, LastLogIndex());
state_.committed_log_size = std::min(req.leader_commit, LastLogIndex());
res.success = true;
res.last_log_term = LastLogTerm();
@ -562,7 +568,7 @@ void RunSimulation() {
auto config = SimulatorConfig{
.drop_percent = 0,
.perform_timeouts = true,
.scramble_messages = true,
.scramble_messages = false,
.rng_seed = 0,
};
@ -573,10 +579,10 @@ void RunSimulation() {
auto srv_addr_2 = Address::TestAddress(3);
auto srv_addr_3 = Address::TestAddress(4);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr, false);
Io<SimulatorTransport> srv_io_1 = simulator.Register(srv_addr_1, true);
Io<SimulatorTransport> srv_io_2 = simulator.Register(srv_addr_2, true);
Io<SimulatorTransport> srv_io_3 = simulator.Register(srv_addr_3, true);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> srv_io_1 = simulator.Register(srv_addr_1);
Io<SimulatorTransport> srv_io_2 = simulator.Register(srv_addr_2);
Io<SimulatorTransport> srv_io_3 = simulator.Register(srv_addr_3);
std::vector<Address> srv_1_peers = {srv_addr_2, srv_addr_3};
std::vector<Address> srv_2_peers = {srv_addr_1, srv_addr_3};
@ -587,8 +593,13 @@ void RunSimulation() {
Server srv_3{srv_io_3, srv_3_peers};
auto srv_thread_1 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_1));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_1);
auto srv_thread_2 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_2));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_2);
auto srv_thread_3 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_3));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_3);
bool success = false;
Address leader = srv_addr_1;
@ -598,7 +609,7 @@ void RunSimulation() {
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
ResponseFuture<ReplicationResponse> response_future =
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(srv_addr_1, cli_req, 100);
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(srv_addr_3, cli_req, 100);
// receive response
ResponseResult<ReplicationResponse> response_result = response_future.Wait();
@ -621,6 +632,7 @@ void RunSimulation() {
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
/*
this is implicit in jthread's dtor
srv_thread_1.join();
srv_thread_2.join();
srv_thread_3.join();
@ -628,10 +640,14 @@ void RunSimulation() {
}
int main() {
for (int i = 0; i < 1; i++) {
int n_tests = 500;
for (int i = 0; i < n_tests; i++) {
std::cout << "========================== NEW SIMULATION ==========================" << std::endl;
RunSimulation();
}
std::cout << "passed " << n_tests << " tests!" << std::endl;
return 0;
}