Fix a few bugs, expose stats
This commit is contained in:
parent
afef6dc11b
commit
581925e660
@ -80,4 +80,6 @@ class Simulator {
|
||||
void IncrementServerCountAndWaitForQuiescentState(Address address) {
|
||||
simulator_handle_->IncrementServerCountAndWaitForQuiescentState(address);
|
||||
}
|
||||
|
||||
SimulatorStats Stats() { return simulator_handle_->Stats(); }
|
||||
};
|
||||
|
@ -218,11 +218,12 @@ class SimulatorHandle {
|
||||
SimulatorStats stats_;
|
||||
size_t blocked_on_receive_ = 0;
|
||||
std::set<Address> server_addresses_;
|
||||
std::mt19937 rng_;
|
||||
SimulatorConfig config_;
|
||||
std::mt19937 rng_{};
|
||||
|
||||
public:
|
||||
SimulatorHandle(SimulatorConfig config) : cluster_wide_time_microseconds_(config.start_time), config_(config) {}
|
||||
SimulatorHandle(SimulatorConfig config)
|
||||
: cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {}
|
||||
|
||||
void IncrementServerCountAndWaitForQuiescentState(Address address) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
@ -245,11 +246,6 @@ class SimulatorHandle {
|
||||
return;
|
||||
}
|
||||
|
||||
std::cout << "only " << (int)blocked_servers << " servers blocked, but size is " << (int)server_addresses_.size()
|
||||
<< std::endl;
|
||||
|
||||
// __asm__ __volatile__("yield");
|
||||
|
||||
cv_.wait(lock);
|
||||
}
|
||||
}
|
||||
@ -271,10 +267,11 @@ class SimulatorHandle {
|
||||
// we only need to advance the simulator when all
|
||||
// servers have reached a quiescent state, blocked
|
||||
// on their own futures or receive methods.
|
||||
// std::cout << "returning from tick: blocked servers less than total servers" << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
stats_.simulator_ticks++;
|
||||
|
||||
cv_.notify_all();
|
||||
|
||||
if (in_flight_.empty()) {
|
||||
@ -306,6 +303,10 @@ class SimulatorHandle {
|
||||
int drop_threshold = drop_distrib(rng_);
|
||||
bool should_drop = drop_threshold < config_.drop_percent;
|
||||
|
||||
if (should_drop) {
|
||||
stats_.dropped_messages++;
|
||||
}
|
||||
|
||||
PromiseKey promise_key{.requester_address = to_address,
|
||||
.request_id = opaque_message.request_id,
|
||||
.replier_address = opaque_message.from_address};
|
||||
@ -320,10 +321,20 @@ class SimulatorHandle {
|
||||
if (should_drop || normal_timeout) {
|
||||
dop.promise.TimeOut();
|
||||
} else {
|
||||
stats_.total_responses++;
|
||||
dop.promise.Fill(std::move(opaque_message));
|
||||
}
|
||||
} else if (should_drop) {
|
||||
// don't add it anywhere, let it drop
|
||||
// don't add it anywhere, let it drop, if it's a request then time it out
|
||||
// TODO queue this up and drop it after its deadline
|
||||
PromiseKey drop_promise_key{.requester_address = opaque_message.from_address,
|
||||
.request_id = opaque_message.request_id,
|
||||
.replier_address = to_address};
|
||||
if (promises_.contains(drop_promise_key)) {
|
||||
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
|
||||
promises_.erase(promise_key);
|
||||
dop.promise.TimeOut();
|
||||
}
|
||||
} else {
|
||||
// add to can_receive_ if not
|
||||
const auto &[om_vec, inserted] = can_receive_.try_emplace(to_address, std::vector<OpaqueMessage>());
|
||||
@ -335,7 +346,6 @@ class SimulatorHandle {
|
||||
|
||||
void ShutDown() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
std::cout << "Shutting down" << std::endl;
|
||||
should_shut_down_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
@ -361,8 +371,8 @@ class SimulatorHandle {
|
||||
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)};
|
||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||
|
||||
stats_.total_messages_++;
|
||||
stats_.total_requests_++;
|
||||
stats_.total_messages++;
|
||||
stats_.total_requests++;
|
||||
|
||||
cv_.notify_all();
|
||||
|
||||
@ -394,7 +404,6 @@ class SimulatorHandle {
|
||||
bool made_progress = MaybeTickSimulator();
|
||||
lock.lock();
|
||||
if (!should_shut_down_ && !made_progress) {
|
||||
// std::cout << "waiting on cv" << std::endl;
|
||||
cv_.wait(lock);
|
||||
}
|
||||
blocked_on_receive_ -= 1;
|
||||
@ -410,7 +419,7 @@ class SimulatorHandle {
|
||||
OpaqueMessage om{.from_address = from_address, .request_id = request_id, .message = std::move(message_any)};
|
||||
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
|
||||
|
||||
stats_.total_messages_++;
|
||||
stats_.total_messages++;
|
||||
|
||||
cv_.notify_all();
|
||||
}
|
||||
@ -425,4 +434,9 @@ class SimulatorHandle {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return distrib(rng_);
|
||||
}
|
||||
|
||||
SimulatorStats Stats() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return stats_;
|
||||
}
|
||||
};
|
||||
|
@ -12,9 +12,9 @@
|
||||
#pragma once
|
||||
|
||||
struct SimulatorStats {
|
||||
uint64_t total_messages_;
|
||||
uint64_t dropped_messages_;
|
||||
uint64_t total_requests_;
|
||||
uint64_t total_responses_;
|
||||
uint64_t simulator_ticks_;
|
||||
uint64_t total_messages = 0;
|
||||
uint64_t dropped_messages = 0;
|
||||
uint64_t total_requests = 0;
|
||||
uint64_t total_responses = 0;
|
||||
uint64_t simulator_ticks = 0;
|
||||
};
|
||||
|
@ -318,7 +318,7 @@ class Server {
|
||||
// Leaders (re)send AppendRequest to followers.
|
||||
std::optional<Role> Cron(Leader &leader) {
|
||||
Time now = io_.Now();
|
||||
Duration broadcast_timeout = RandomTimeout(20000, 30000);
|
||||
Duration broadcast_timeout = RandomTimeout(40000, 60000);
|
||||
|
||||
if (now - leader.last_broadcast > broadcast_timeout) {
|
||||
BroadcastAppendEntries(leader.followers);
|
||||
@ -383,6 +383,10 @@ class Server {
|
||||
.last_received_append_entries_timestamp = io_.Now(),
|
||||
.leader_address = from_address,
|
||||
};
|
||||
} else if (term_dominates) {
|
||||
Log("received a vote from an inferior candidate. Becoming Candidate");
|
||||
state_.term = std::max(state_.term, req.term) + 1;
|
||||
return Candidate{};
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
@ -624,7 +628,7 @@ void RunSimulation() {
|
||||
.scramble_messages = true,
|
||||
.rng_seed = 0,
|
||||
.start_time = 200000,
|
||||
.abort_time = 1 * 1024 * 1024,
|
||||
.abort_time = 8 * 1024 * 1024,
|
||||
};
|
||||
|
||||
auto simulator = Simulator(config);
|
||||
@ -663,7 +667,7 @@ void RunSimulation() {
|
||||
bool success = false;
|
||||
Address leader = server_addrs[0];
|
||||
|
||||
for (int retries = 0; retries < 100; retries++) {
|
||||
while (true) {
|
||||
// send request
|
||||
ReplicationRequest cli_req;
|
||||
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
|
||||
@ -704,6 +708,15 @@ void RunSimulation() {
|
||||
MG_ASSERT(success);
|
||||
|
||||
simulator.ShutDown();
|
||||
|
||||
SimulatorStats stats = simulator.Stats();
|
||||
|
||||
std::cout << "total messages: " << (int)stats.total_messages << std::endl;
|
||||
std::cout << "dropped messages: " << (int)stats.dropped_messages << std::endl;
|
||||
std::cout << "total requests: " << (int)stats.total_requests << std::endl;
|
||||
std::cout << "total responses: " << (int)stats.total_responses << std::endl;
|
||||
std::cout << "simulator ticks: " << (int)stats.simulator_ticks << std::endl;
|
||||
|
||||
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
|
||||
|
||||
/*
|
||||
|
Loading…
Reference in New Issue
Block a user