Continue codebase clean-up in preparation for merge
This commit is contained in:
parent
1ef11a36f4
commit
fbd015d3c6
@ -12,9 +12,13 @@
|
||||
#pragma once
|
||||
|
||||
// Signifies that a retriable operation was unable to
|
||||
// (fully) complete after a configured number of retries.
|
||||
// complete after a configured number of retries.
|
||||
struct RetriesExhausted {};
|
||||
|
||||
// Signifies that an operation was unable
|
||||
// to (fully) complete after a configured duration.
|
||||
// Signifies that a request was unable to receive a response
|
||||
// within some configured timeout duration. It is important
|
||||
// to remember that in distributed systems, a timeout does
|
||||
// not signify that a request was not received or processed.
|
||||
// It may be the case that the request was fully processed
|
||||
// but that the response was not received.
|
||||
struct TimedOut {};
|
||||
|
@ -12,56 +12,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
|
||||
#include "io/v3/address.hpp"
|
||||
#include "io/v3/errors.hpp"
|
||||
#include "io/v3/future.hpp"
|
||||
#include "io/v3/simulator_config.hpp"
|
||||
#include "io/v3/simulator_handle.hpp"
|
||||
#include "io/v3/transport.hpp"
|
||||
|
||||
class SimulatorTransport {
|
||||
std::shared_ptr<SimulatorHandle> simulator_handle_;
|
||||
Address address_;
|
||||
std::mt19937 rng_{};
|
||||
|
||||
public:
|
||||
SimulatorTransport(std::shared_ptr<SimulatorHandle> simulator_handle, Address address, uint64_t seed)
|
||||
: simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
|
||||
|
||||
template <Message Request, Message Response>
|
||||
ResponseFuture<Response> Request(Address address, uint64_t request_id, Request request,
|
||||
uint64_t timeout_microseconds) {
|
||||
std::function<bool()> maybe_tick_simulator = [=] { return simulator_handle_->MaybeTickSimulator(); };
|
||||
auto [future, promise] = FuturePromisePairWithNotifier<ResponseResult<Response>>(maybe_tick_simulator);
|
||||
|
||||
simulator_handle_->SubmitRequest(address, address_, request_id, std::move(request), timeout_microseconds,
|
||||
std::move(promise));
|
||||
|
||||
return std::move(future);
|
||||
}
|
||||
|
||||
template <Message... Ms>
|
||||
requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(uint64_t timeout_microseconds) {
|
||||
return simulator_handle_->template Receive<Ms...>(address_, timeout_microseconds);
|
||||
}
|
||||
|
||||
template <Message M>
|
||||
void Send(Address address, uint64_t request_id, M message) {
|
||||
return simulator_handle_->template Send<M>(address, address_, request_id, message);
|
||||
}
|
||||
|
||||
uint64_t Now() { return simulator_handle_->Now(); }
|
||||
|
||||
bool ShouldShutDown() { return simulator_handle_->ShouldShutDown(); }
|
||||
|
||||
template <class D = std::poisson_distribution<>, class Return = uint64_t>
|
||||
Return Rand(D distrib) {
|
||||
return distrib(rng_);
|
||||
}
|
||||
};
|
||||
#include "io/v3/simulator_transport.hpp"
|
||||
|
||||
class Simulator {
|
||||
std::mt19937 rng_{};
|
||||
|
@ -261,6 +261,9 @@ class SimulatorHandle {
|
||||
std::cout << "timing out request" << std::endl;
|
||||
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
|
||||
promises_.erase(promise_key);
|
||||
|
||||
stats_.timed_out_requests++;
|
||||
|
||||
dop.promise.TimeOut();
|
||||
}
|
||||
}
|
||||
@ -337,6 +340,7 @@ class SimulatorHandle {
|
||||
bool normal_timeout = config_.perform_timeouts && (dop.deadline < cluster_wide_time_microseconds_);
|
||||
|
||||
if (should_drop || normal_timeout) {
|
||||
stats_.timed_out_requests++;
|
||||
dop.promise.TimeOut();
|
||||
} else {
|
||||
stats_.total_responses++;
|
||||
|
@ -14,6 +14,7 @@
|
||||
struct SimulatorStats {
|
||||
uint64_t total_messages = 0;
|
||||
uint64_t dropped_messages = 0;
|
||||
uint64_t timed_out_requests = 0;
|
||||
uint64_t total_requests = 0;
|
||||
uint64_t total_responses = 0;
|
||||
uint64_t simulator_ticks = 0;
|
||||
|
@ -16,7 +16,6 @@
|
||||
// TODO(tyler) fix disparity between 1-based indexing in raft paper and log's index
|
||||
// TODO(tyler) make rng thread-local to facilitate determinism despite non-deterministic mutex races
|
||||
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
@ -171,7 +170,7 @@ class Server {
|
||||
auto indices = std::vector<LogIndex>{state_.log.size()};
|
||||
for (const auto &[addr, f] : leader.followers) {
|
||||
indices.push_back(f.confirmed_contiguous_index);
|
||||
Log("Follower at port ", (int)addr.last_known_port,
|
||||
Log("Follower at port ", addr.last_known_port,
|
||||
" has confirmed contiguous index of: ", f.confirmed_contiguous_index);
|
||||
}
|
||||
std::ranges::sort(indices, std::ranges::greater());
|
||||
@ -283,7 +282,7 @@ class Server {
|
||||
|
||||
if (now - candidate.election_began > election_timeout) {
|
||||
state_.term++;
|
||||
Log("becoming Candidate for term ", (int)state_.term, " after leader timeout of ", (int)election_timeout,
|
||||
Log("becoming Candidate for term ", state_.term, " after leader timeout of ", election_timeout,
|
||||
" elapsed since last election attempt");
|
||||
|
||||
VoteRequest request{
|
||||
@ -335,7 +334,7 @@ class Server {
|
||||
BroadcastAppendEntries(leader.followers);
|
||||
leader.last_broadcast = now;
|
||||
}
|
||||
// TODO TimeOutOldClientRequests();
|
||||
// TODO(tyler) TimeOutOldClientRequests();
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@ -359,7 +358,7 @@ class Server {
|
||||
std::visit([&](auto &&msg, auto &&role) { return Handle(role, std::move(msg), request_id, from_address); },
|
||||
std::move(message_variant), role_);
|
||||
|
||||
// TODO(m3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc...
|
||||
// TODO(tyler) (M3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc...
|
||||
if (new_role) {
|
||||
role_ = std::move(new_role).value();
|
||||
}
|
||||
@ -368,7 +367,7 @@ class Server {
|
||||
// all roles can receive Vote and possibly become a follower
|
||||
template <typename AllRoles>
|
||||
std::optional<Role> Handle(AllRoles &, VoteRequest &&req, RequestId request_id, Address from_address) {
|
||||
Log("received Vote from ", (int)from_address.last_known_port, " with term ", req.term);
|
||||
Log("received Vote from ", from_address.last_known_port, " with term ", req.term);
|
||||
bool last_log_term_dominates = req.last_log_term >= LastLogTerm();
|
||||
bool term_dominates = req.term > state_.term;
|
||||
bool last_log_index_dominates = req.last_log_index >= LastLogIndex();
|
||||
@ -439,7 +438,7 @@ class Server {
|
||||
followers.insert({address, std::move(follower)});
|
||||
}
|
||||
|
||||
Log("becoming Leader at term ", (int)state_.term);
|
||||
Log("becoming Leader at term ", state_.term);
|
||||
|
||||
BroadcastAppendEntries(followers);
|
||||
|
||||
@ -475,7 +474,7 @@ class Server {
|
||||
|
||||
BroadcastAppendEntries(leader.followers);
|
||||
|
||||
// TODO add message to pending requests buffer, reply asynchronously
|
||||
// TODO(tyler) add message to pending requests buffer, reply asynchronously
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@ -528,7 +527,7 @@ class Server {
|
||||
|
||||
io_.Send(from_address, request_id, res);
|
||||
|
||||
Log("becoming Follower of Leader ", (int)from_address.last_known_port, " at term ", (int)req.term);
|
||||
Log("becoming Follower of Leader ", from_address.last_known_port, " at term ", req.term);
|
||||
return Follower{
|
||||
.last_received_append_entries_timestamp = now,
|
||||
.leader_address = from_address,
|
||||
@ -538,7 +537,7 @@ class Server {
|
||||
io_.Send(from_address, request_id, res);
|
||||
|
||||
return std::nullopt;
|
||||
};
|
||||
}
|
||||
|
||||
// at this point, we're dealing with our own leader
|
||||
|
||||
@ -593,11 +592,11 @@ class Server {
|
||||
MG_ASSERT(false, "received AppendResponse from unknown Follower");
|
||||
} else {
|
||||
if (res.success) {
|
||||
Log("Leader got successful AppendResponse from ", (int)from_address.last_known_port, " with last_log_index of ",
|
||||
(int)res.last_log_index);
|
||||
Log("Leader got successful AppendResponse from ", from_address.last_known_port, " with last_log_index of ",
|
||||
res.last_log_index);
|
||||
} else {
|
||||
Log("Leader got unsuccessful AppendResponse from ", (int)from_address.last_known_port,
|
||||
" with last_log_index of ", (int)res.last_log_index);
|
||||
Log("Leader got unsuccessful AppendResponse from ", from_address.last_known_port, " with last_log_index of ",
|
||||
res.last_log_index);
|
||||
}
|
||||
FollowerTracker &follower = leader.followers.at(from_address);
|
||||
follower.next_index = std::max(follower.next_index, res.last_log_index);
|
||||
@ -619,7 +618,7 @@ class Server {
|
||||
Time now = io_.Now();
|
||||
Term term = state_.term;
|
||||
|
||||
std::cout << '\t' << (int)now << "\t" << (int)term << "\t" << (int)io_.GetAddress().last_known_port;
|
||||
std::cout << '\t' << now << "\t" << term << "\t" << io_.GetAddress().last_known_port;
|
||||
|
||||
std::visit([&](auto &&role) { role.Print(); }, role_);
|
||||
|
||||
@ -683,7 +682,7 @@ void RunSimulation() {
|
||||
ReplicationRequest cli_req;
|
||||
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
|
||||
|
||||
std::cout << "client sending ReplicationRequest to Leader " << (int)leader.last_known_port << std::endl;
|
||||
std::cout << "client sending ReplicationRequest to Leader " << leader.last_known_port << std::endl;
|
||||
ResponseFuture<ReplicationResponse> response_future =
|
||||
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(leader, cli_req, 50000);
|
||||
|
||||
@ -711,8 +710,8 @@ void RunSimulation() {
|
||||
size_t addr_index = addr_distrib(cli_rng_);
|
||||
leader = server_addrs[addr_index];
|
||||
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << (int)addr_index
|
||||
<< " with port " << (int)leader.last_known_port << std::endl;
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
|
||||
<< " with port " << leader.last_known_port << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
@ -722,11 +721,12 @@ void RunSimulation() {
|
||||
|
||||
SimulatorStats stats = simulator.Stats();
|
||||
|
||||
std::cout << "total messages: " << (int)stats.total_messages << std::endl;
|
||||
std::cout << "dropped messages: " << (int)stats.dropped_messages << std::endl;
|
||||
std::cout << "total requests: " << (int)stats.total_requests << std::endl;
|
||||
std::cout << "total responses: " << (int)stats.total_responses << std::endl;
|
||||
std::cout << "simulator ticks: " << (int)stats.simulator_ticks << std::endl;
|
||||
std::cout << "total messages: " << stats.total_messages << std::endl;
|
||||
std::cout << "dropped messages: " << stats.dropped_messages << std::endl;
|
||||
std::cout << "timed out requests: " << stats.timed_out_requests << std::endl;
|
||||
std::cout << "total requests: " << stats.total_requests << std::endl;
|
||||
std::cout << "total responses: " << stats.total_responses << std::endl;
|
||||
std::cout << "simulator ticks: " << stats.simulator_ticks << std::endl;
|
||||
|
||||
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
|
||||
|
||||
@ -739,7 +739,7 @@ void RunSimulation() {
|
||||
}
|
||||
|
||||
int main() {
|
||||
int n_tests = 500;
|
||||
int n_tests = 50;
|
||||
|
||||
for (int i = 0; i < n_tests; i++) {
|
||||
std::cout << "========================== NEW SIMULATION " << i << " ==========================" << std::endl;
|
||||
|
Loading…
Reference in New Issue
Block a user