Streamline simulator tick condition varible notification. Advance time more aggressively
This commit is contained in:
parent
923325b8fa
commit
6b9a617df0
@ -79,26 +79,19 @@ bool SimulatorHandle::MaybeTickSimulator() {
|
||||
// after all servers are blocked on receive.
|
||||
spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
|
||||
stats_.simulator_ticks++;
|
||||
blocked_on_receive_.clear();
|
||||
cv_.notify_all();
|
||||
|
||||
bool fired_cv = false;
|
||||
bool timed_anything_out = TimeoutPromisesPastDeadline();
|
||||
|
||||
if (timed_anything_out) {
|
||||
spdlog::info("simulator progressing: timed out a request");
|
||||
fired_cv = true;
|
||||
blocked_on_receive_.clear();
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
|
||||
cluster_wide_time_microseconds_ += clock_advance;
|
||||
|
||||
if (!fired_cv) {
|
||||
spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
|
||||
cv_.notify_all();
|
||||
blocked_on_receive_.clear();
|
||||
fired_cv = true;
|
||||
}
|
||||
spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
|
||||
|
||||
if (cluster_wide_time_microseconds_ >= config_.abort_time) {
|
||||
if (should_shut_down_) {
|
||||
@ -143,37 +136,27 @@ bool SimulatorHandle::MaybeTickSimulator() {
|
||||
if (should_drop || normal_timeout) {
|
||||
stats_.timed_out_requests++;
|
||||
dop.promise.TimeOut();
|
||||
spdlog::info("timing out request ");
|
||||
spdlog::info("simulator timing out request ");
|
||||
} else {
|
||||
stats_.total_responses++;
|
||||
Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
|
||||
auto type_info = opaque_message.type_info;
|
||||
dop.promise.Fill(std::move(opaque_message), response_latency);
|
||||
histograms_.Measure(type_info, response_latency);
|
||||
spdlog::info("replying to request");
|
||||
spdlog::info("simulator replying to request");
|
||||
}
|
||||
} else if (should_drop) {
|
||||
// don't add it anywhere, let it drop
|
||||
spdlog::info("silently dropping request");
|
||||
|
||||
// we don't want to reset the block list here
|
||||
return true;
|
||||
spdlog::info("simulator silently dropping request");
|
||||
} else {
|
||||
// add to can_receive_ if not
|
||||
spdlog::info("adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
|
||||
spdlog::info("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
|
||||
opaque_message.to_address.last_known_port);
|
||||
const auto &[om_vec, inserted] =
|
||||
can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
|
||||
om_vec->second.emplace_back(std::move(opaque_message));
|
||||
}
|
||||
|
||||
if (!fired_cv) {
|
||||
spdlog::info("simulator progressing: handled a message");
|
||||
cv_.notify_all();
|
||||
blocked_on_receive_.clear();
|
||||
fired_cv = true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ class SimulatorHandle {
|
||||
std::set<Address> blocked_on_receive_;
|
||||
std::set<Address> server_addresses_;
|
||||
std::mt19937 rng_;
|
||||
std::uniform_int_distribution<int> time_distrib_{0, 50};
|
||||
std::uniform_int_distribution<int> time_distrib_{0, 1000};
|
||||
std::uniform_int_distribution<int> drop_distrib_{0, 99};
|
||||
SimulatorConfig config_;
|
||||
MessageHistogramCollector histograms_;
|
||||
|
@ -275,8 +275,6 @@ int main() {
|
||||
spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
|
||||
std::terminate();
|
||||
}
|
||||
spdlog::info("run 1 simulator stats: {}", sim_stats_1);
|
||||
spdlog::info("run 2 simulator stats: {}", sim_stats_2);
|
||||
}
|
||||
|
||||
spdlog::info("passed {} tests!", n_tests);
|
||||
|
Loading…
Reference in New Issue
Block a user