Get most of the raft data path up and running

This commit is contained in:
Tyler Neely 2022-07-14 18:45:24 +00:00
parent 5e98971bb2
commit d14f7705b1
2 changed files with 145 additions and 63 deletions
src/io/v3
tests/simulation

View File

@ -208,6 +208,8 @@ class SimulatorHandle {
bool MaybeTickSimulator() {
std::unique_lock<std::mutex> lock(mu_);
// TODO nuke this notify
cv_.notify_all();
size_t blocked_servers = blocked_on_receive_;
@ -219,21 +221,26 @@ class SimulatorHandle {
}
}
// TODO this is not deterministic time advancement
// clock ticks forwards by this many microseconds on average
std::poisson_distribution<> time_distrib(100);
uint64_t clock_advance = time_distrib(rng_);
cluster_wide_time_microseconds_ += clock_advance;
if (blocked_servers < servers_) {
// we only need to advance the simulator when all
// servers have reached a quiescent state, blocked
// on their own futures or receive methods.
// std::cout << "returning from tick: blocked servers less than total servers" << std::endl;
return false;
}
if (in_flight_.empty()) {
// std::cout << "returning from tick: empty in_flight_" << std::endl;
return false;
}
// clock ticks forwards by 400 microseconds on average
std::poisson_distribution<> time_distrib(400);
uint64_t clock_advance = time_distrib(rng_);
cluster_wide_time_microseconds_ += clock_advance;
// std::cout << "looking at message in tick" << std::endl;
if (config_.scramble_messages) {
// scramble messages

View File

@ -9,6 +9,12 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// TODO(tyler) buffer out-of-order Append buffers to reassemble more quickly
// TODO(tyler) handle granular batch sizes based on simple flow control
// TODO(tyler) add "application" test that asserts that all state machines apply the same items in-order
#include <chrono>
#include <deque>
#include <iostream>
#include <map>
#include <set>
@ -45,6 +51,7 @@ struct AppendRequest {
struct AppendResponse {
bool success;
Term term;
Term last_log_term;
// a small optimization over the raft paper, tells
// the leader the offset that we are interested in
@ -66,10 +73,10 @@ struct VoteResponse {
};
struct CommonState {
Term term;
Term term = 0;
std::vector<std::pair<Term, Op>> log;
LogIndex commit_index;
LogIndex last_applied;
LogIndex commit_index = 0;
LogIndex last_applied = 0;
};
struct FollowerTracker {
@ -77,8 +84,15 @@ struct FollowerTracker {
LogIndex confirmed_contiguous_index = 0;
};
struct PendingClientRequest {
LogIndex log_index;
RequestId request_id;
Address address;
};
struct Leader {
std::map<Address, FollowerTracker> followers;
std::deque<PendingClientRequest> pending_client_requests;
};
struct Candidate {
@ -109,20 +123,16 @@ class Server {
while (!io_.ShouldShutDown()) {
auto now = io_.Now();
if (now - last_cron > cron_interval) {
Log("doing cron");
Cron();
last_cron = now;
}
Log("waiting for requests");
auto request_result = io_.template Receive<AppendRequest, VoteRequest, VoteResponse, ReplicationRequest>();
auto request_result =
io_.template Receive<AppendRequest, AppendResponse, ReplicationRequest, VoteRequest, VoteResponse>();
if (request_result.HasError()) {
continue;
}
Log("received request");
auto request = std::move(request_result.GetValue());
Handle(std::move(request.message), request.request_id, request.from_address);
@ -135,6 +145,49 @@ class Server {
Io<IoImpl> io_;
std::vector<Address> peers_;
void BumpCommitIndexAndReplyToClients(Leader &leader) {
// set the current commit_index based on the
auto indices = std::vector<LogIndex>{state_.log.size()};
for (const auto &[addr, f] : leader.followers) {
indices.push_back(f.confirmed_contiguous_index);
}
std::ranges::sort(indices, std::ranges::greater());
// assuming reverse sort (using std::ranges::greater)
// 3 -> 2 (index 1)
// 4 -> 3 (index 2)
// 5 -> 3 (index 2)
state_.commit_index = indices[(indices.size() / 2)];
while (!leader.pending_client_requests.empty()) {
auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.commit_index) {
ReplicationResponse rr{
.success = true,
.retry_leader = std::nullopt,
};
io_.Send(front.address, front.request_id, std::move(rr));
leader.pending_client_requests.pop_front();
} else {
break;
}
}
}
void BroadcastAppendEntries(std::map<Address, FollowerTracker> &followers) {
AppendRequest ar{
.term = state_.term,
.last_log_index = 0,
.last_log_term = 0,
.entries = std::vector<std::pair<Term, Op>>(),
.leader_commit = state_.commit_index,
};
for (auto &[address, follower] : followers) {
// request_id not necessary to set because it's not a Future-backed Request.
RequestId request_id = 0;
io_.Send(address, request_id, ar);
}
}
Duration RandomTimeout(Duration min, Duration max) {
std::uniform_int_distribution<> time_distrib(min, max);
return io_.Rand(time_distrib);
@ -164,11 +217,11 @@ class Server {
/// Periodic protocol maintenance.
void Cron() {
Log("running Cron");
// dispatch periodic logic based on our role to a specific Cron method.
std::optional<Role> new_role = std::visit([&](auto &&role) { return Cron(role); }, role_);
if (new_role) {
Log("becoming new role");
role_ = std::move(new_role).value();
}
}
@ -182,7 +235,6 @@ class Server {
Duration election_timeout = RandomTimeout(100000, 150000);
if (now - candidate.election_began > election_timeout) {
Log("running for election");
state_.term++;
VoteRequest request{
.term = state_.term,
@ -199,6 +251,7 @@ class Server {
outstanding_votes.insert(peer);
}
Log("becoming Candidate for term ", (int)state_.term);
return Candidate{
.successful_votes = std::map<Address, LogIndex>(),
.election_began = now,
@ -225,8 +278,10 @@ class Server {
}
// Leaders (re)send AppendRequest to followers.
std::optional<Role> Cron(Leader &) {
std::optional<Role> Cron(Leader &leader) {
// TODO time-out client requests if we haven't made progress after some threshold
Log("leader broadcasting");
BroadcastAppendEntries(leader.followers);
return std::nullopt;
}
@ -239,8 +294,9 @@ class Server {
/// to its role, and as the second argument, the
/// message that has been received.
/// **********************************************
void Handle(std::variant<AppendRequest, VoteRequest, VoteResponse, ReplicationRequest> &&message_variant,
RequestId request_id, Address from_address) {
void Handle(
std::variant<AppendRequest, AppendResponse, ReplicationRequest, VoteRequest, VoteResponse> &&message_variant,
RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
// or it can be `auto` if it's a handler for several roles
@ -251,7 +307,6 @@ class Server {
// TODO(m3) maybe replace std::visit with get_if for explicit prioritized matching, [[likely]] etc...
if (new_role) {
Log("becoming new role");
role_ = std::move(new_role).value();
}
}
@ -306,8 +361,6 @@ class Server {
candidate.successful_votes.insert({from_address, res.commit_index});
if (candidate.successful_votes.size() >= candidate.outstanding_votes.size()) {
Log("becoming Leader");
std::map<Address, FollowerTracker> followers{};
for (const auto &[address, commit_index] : candidate.successful_votes) {
@ -325,8 +378,12 @@ class Server {
followers.insert({address, follower});
}
BroadcastAppendEntries(followers);
Log("becoming Leader at term ", (int)state_.term);
return Leader{
.followers = std::move(followers),
.pending_client_requests = std::deque<PendingClientRequest>(),
};
}
}
@ -341,12 +398,20 @@ class Server {
}
// only leaders actually handle replication requests from clients
std::optional<Role> Handle(Leader &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Leader &leader, ReplicationRequest &&req, RequestId request_id, Address from_address) {
Log("leader received ReplicationRequest");
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.opaque_data)));
PendingClientRequest pcr{
.log_index = state_.log.size(),
.request_id = request_id,
.address = from_address,
};
leader.pending_client_requests.push_back(pcr);
// TODO add message to pending requests buffer, reply asynchronously
return std::nullopt;
}
@ -374,8 +439,10 @@ class Server {
return std::nullopt;
}
std::optional<Role> Handle(Follower &follower, AppendRequest &&req, RequestId request_id, Address from_address) {
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &role, AppendRequest &&req, RequestId request_id, Address from_address) {
AppendResponse res{
.term = state_.term,
.success = false,
.last_log_term = CommittedLogTerm(),
.last_log_index = CommittedLogIndex(),
@ -388,6 +455,7 @@ class Server {
io_.Send(from_address, request_id, res);
Log("becoming Follower");
return Follower{
.last_received_append_entries_timestamp = io_.Now(),
.leader_address = from_address,
@ -399,8 +467,11 @@ class Server {
return std::nullopt;
};
MG_ASSERT(follower.leader_address == from_address, "Multiple Leaders are acting under the same term number!");
follower.last_received_append_entries_timestamp = io_.Now();
if constexpr (std::is_same<AllRoles, Follower>()) {
// small specialization for when we're already a Follower
MG_ASSERT(role.leader_address == from_address, "Multiple Leaders are acting under the same term number!");
role.last_received_append_entries_timestamp = io_.Now();
}
// Handle steady-state conditions.
if (req.last_log_index != LastLogIndex()) {
@ -430,37 +501,25 @@ class Server {
return std::nullopt;
}
// Non-Followers can receive an AppendRequest and potentially be flipped to a follower state.
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, AppendRequest &&req, RequestId request_id, Address from_address) {
Log("non-Follower received Append from a Leader");
std::optional<Role> Handle(Leader &leader, AppendResponse &&res, RequestId request_id, Address from_address) {
if (res.term != state_.term) {
} else if (!leader.followers.contains(from_address)) {
} else if (!res.success) {
} else {
Log("Leader got successful AppendResponse");
FollowerTracker &follower = leader.followers.at(from_address);
follower.next_index = std::max(follower.next_index, res.last_log_index);
follower.confirmed_contiguous_index = std::max(follower.confirmed_contiguous_index, res.last_log_index);
bool error = true;
std::optional<Role> ret = std::nullopt;
if (req.term > state_.term) {
// become follower of this leader, reply with our log status
error = false;
state_.term = req.term;
ret = Follower{
.last_received_append_entries_timestamp = io_.Now(),
.leader_address = from_address,
};
BumpCommitIndexAndReplyToClients(leader);
}
return std::nullopt;
}
AppendResponse res{
.success = error,
.last_log_term = CommittedLogTerm(),
.last_log_index = CommittedLogIndex(),
};
io_.Send(from_address, request_id, res);
return ret;
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, AppendResponse &&res, RequestId request_id, Address from_address) {
// we used to be the leader, and are getting old delayed responses
return std::nullopt;
}
template <typename... Ts>
@ -507,19 +566,35 @@ int main() {
auto srv_thread_2 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_2));
auto srv_thread_3 = std::jthread(RunServer<SimulatorTransport>, std::move(srv_3));
// send request
ReplicationRequest cli_req;
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ResponseFuture<ReplicationResponse> response_future =
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(srv_addr_1, cli_req, 100);
bool success = false;
Address leader = srv_addr_1;
for (int retries = 0; retries < 30; retries++) {
// send request
ReplicationRequest cli_req;
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
// receive response
ResponseResult<ReplicationResponse> response_result = response_future.Wait();
ResponseEnvelope<ReplicationResponse> response_envelope = response_result.GetValue();
ReplicationResponse response = response_envelope.message;
ResponseFuture<ReplicationResponse> response_future =
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(srv_addr_1, cli_req, 100);
MG_ASSERT(response.success);
// receive response
ResponseResult<ReplicationResponse> response_result = response_future.Wait();
ResponseEnvelope<ReplicationResponse> response_envelope = response_result.GetValue();
ReplicationResponse response = response_envelope.message;
if (response.success) {
success = true;
break;
}
if (response.retry_leader) {
leader = response.retry_leader.value();
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
MG_ASSERT(success);
simulator.ShutDown();