Extract BlockedServers functionality from MaybeTickSimulator and IncrementServerCountAndWaitForQuiescentState

This commit is contained in:
Tyler Neely 2022-08-02 14:30:12 +00:00
parent 4f06eb0f2f
commit 54369958d1
2 changed files with 18 additions and 12 deletions

View File

@ -38,15 +38,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
server_addresses_.insert(address);
while (true) {
size_t blocked_servers = blocked_on_receive_;
for (auto &[promise_key, opaque_promise] : promises_) {
if (opaque_promise.promise.IsAwaited()) {
if (server_addresses_.contains(promise_key.requester_address)) {
blocked_servers++;
}
}
}
size_t blocked_servers = BlockedServers();
bool all_servers_blocked = blocked_servers == server_addresses_.size();
@ -58,9 +50,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
}
}
bool SimulatorHandle::MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_);
size_t SimulatorHandle::BlockedServers() {
size_t blocked_servers = blocked_on_receive_;
for (auto &[promise_key, opaque_promise] : promises_) {
@ -71,6 +61,14 @@ bool SimulatorHandle::MaybeTickSimulator() {
}
}
return blocked_servers;
}
bool SimulatorHandle::MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_);
size_t blocked_servers = BlockedServers();
if (blocked_servers < server_addresses_.size()) {
// we only need to advance the simulator when all
// servers have reached a quiescent state, blocked

View File

@ -82,6 +82,14 @@ class SimulatorHandle {
std::mt19937 rng_;
SimulatorConfig config_;
/// Returns the number of servers currently blocked on Receive, plus
/// the servers that are blocked on Futures that were created through
/// SimulatorTransport::Request.
///
/// TODO(tyler) investigate whether avoiding consideration of Futures
/// increases determinism.
size_t BlockedServers();
void TimeoutPromisesPastDeadline() {
const Time now = cluster_wide_time_microseconds_;