Make Raft index tracking less bug-prone by focusing on "log size" which gracefully initializes to 0 instead of 1-indexed log index tracking which does not cleanly initialize as intuitively

This commit is contained in:
Tyler Neely 2022-08-29 09:18:25 +00:00
parent 10f8af4681
commit f6e41bd0f5
2 changed files with 112 additions and 94 deletions
src/io/rsm
tests/simulation

View File

@ -27,7 +27,7 @@
namespace memgraph::io::rsm {
/// Timeout tunables
/// Timeout and replication tunables
using namespace std::chrono_literals;
static constexpr auto kMinimumElectionTimeout = 100ms;
static constexpr auto kMaximumElectionTimeout = 200ms;
@ -47,9 +47,11 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval,
"The minimum cron interval has to be smaller than the maximum cron interval!");
static_assert(kMinimumReceiveTimeout < kMaximumReceiveTimeout,
"The minimum receive timeout has to be smaller than the maximum receive timeout!");
static constexpr size_t kMaximumAppendBatchSize = 1024;
using Term = uint64_t;
using LogIndex = uint64_t;
using LogSize = uint64_t;
using RequestId = uint64_t;
template <typename WriteOperation>
@ -71,6 +73,7 @@ struct WriteResponse {
bool success;
WriteReturn write_return;
std::optional<Address> retry_leader;
LogIndex raft_index;
};
template <typename ReadOperation>
@ -96,10 +99,10 @@ struct ReadResponse {
template <typename WriteRequest>
struct AppendRequest {
Term term = 0;
LogIndex last_log_index;
LogIndex batch_start_log_index;
Term last_log_term;
std::vector<std::pair<Term, WriteRequest>> entries;
LogIndex leader_commit;
LogSize leader_commit;
};
struct AppendResponse {
@ -110,18 +113,18 @@ struct AppendResponse {
// the leader the offset that we are interested in
// to send log offsets from for us. This will only
// be useful at the beginning of a leader's term.
LogIndex last_log_index;
LogSize log_size;
};
struct VoteRequest {
Term term = 0;
LogIndex last_log_index;
LogSize log_size;
Term last_log_term;
};
struct VoteResponse {
Term term = 0;
LogIndex committed_log_size;
LogSize committed_log_size;
bool vote_granted = false;
};
@ -129,13 +132,13 @@ template <typename WriteRequest>
struct CommonState {
Term term = 0;
std::vector<std::pair<Term, WriteRequest>> log;
LogIndex committed_log_size = 0;
LogIndex applied_size = 0;
LogSize committed_log_size = 0;
LogSize applied_size = 0;
};
struct FollowerTracker {
LogIndex next_index = 0;
LogIndex confirmed_contiguous_index = 0;
LogSize confirmed_log_size = 0;
};
struct PendingClientRequest {
@ -153,7 +156,7 @@ struct Leader {
};
struct Candidate {
std::map<Address, LogIndex> successful_votes;
std::map<Address, LogSize> successful_votes;
Time election_began = Time::min();
std::set<Address> outstanding_votes;
@ -257,41 +260,45 @@ class Raft {
// "Safely replicated" is defined as being known to be present
// on at least a majority of all peers (inclusive of the Leader).
void BumpCommitIndexAndReplyToClients(Leader &leader) {
auto indices = std::vector<LogIndex>{};
auto confirmed_log_sizes = std::vector<LogSize>{};
// We include our own log size in the calculation of the log
// index that is present on at least a majority of all peers.
indices.push_back(state_.log.size());
// confirmed log size that is present on at least a majority of all peers.
confirmed_log_sizes.push_back(state_.log.size());
for (const auto &[addr, f] : leader.followers) {
indices.push_back(f.confirmed_contiguous_index);
Log("at port ", addr.last_known_port, " has confirmed contiguous index of: ", f.confirmed_contiguous_index);
confirmed_log_sizes.push_back(f.confirmed_log_size);
Log("Follower at port ", addr.last_known_port, " has confirmed log size of: ", f.confirmed_log_size);
}
// reverse sort from highest to lowest (using std::ranges::greater)
std::ranges::sort(indices, std::ranges::greater());
std::ranges::sort(confirmed_log_sizes, std::ranges::greater());
// This is a particularly correctness-critical calculation because it
// determines which index we will consider to be the committed index.
// determines the committed log size that will be broadcast in
// the next AppendRequest.
//
// If the following indexes are recorded for clusters of different sizes,
// these are the expected indexes that are considered to have reached
// consensus:
// state | expected value | (indices.size() / 2)
// If the following sizes are recorded for clusters of different numbers of peers,
// these are the expected sizes that are considered to have reached consensus:
//
// state | expected value | (confirmed_log_sizes.size() / 2)
// [1] 1 (1 / 2) => 0
// [2, 1] 1 (2 / 2) => 1
// [3, 2, 1] 2 (3 / 2) => 1
// [4, 3, 2, 1] 2 (4 / 2) => 2
// [5, 4, 3, 2, 1] 3 (5 / 2) => 2
size_t index_present_on_majority = indices.size() / 2;
LogIndex new_committed_log_size = indices[index_present_on_majority];
size_t majority_index = confirmed_log_sizes.size() / 2;
LogSize new_committed_log_size = confirmed_log_sizes[majority_index];
// We never go backwards in history.
MG_ASSERT(state_.committed_log_size <= new_committed_log_size);
MG_ASSERT(state_.committed_log_size <= new_committed_log_size,
"as a Leader, we have previously set our committed_log_size to {}, but our Followers have a majority "
"committed_log_size of {}",
state_.committed_log_size, new_committed_log_size);
state_.committed_log_size = new_committed_log_size;
// For each index between the old index and the new one (inclusive),
// For each size between the old size and the new one (inclusive),
// Apply that log's WriteOperation to our replicated_state_,
// and use the specific return value of the ReplicatedState::Apply
// method (WriteResponseValue) to respond to the requester.
@ -307,6 +314,7 @@ class Raft {
WriteResponse<WriteResponseValue> resp{
.success = true,
.write_return = std::move(write_return),
.raft_index = apply_index,
};
io_.Send(client_request.address, client_request.request_id, std::move(resp));
@ -320,31 +328,34 @@ class Raft {
// AppendEntries RPCs are initiated by leaders to replicate log entries and to provide a form of heartbeat
void BroadcastAppendEntries(std::map<Address, FollowerTracker> &followers) {
for (auto &[address, follower] : followers) {
const LogIndex index = follower.confirmed_contiguous_index;
const LogSize follower_log_size = follower.confirmed_log_size;
std::vector<std::pair<Term, WriteOperation>> entries;
if (state_.log.size() > index) {
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
}
auto missing = state_.log.size() - follower_log_size;
auto batch_size = std::min(missing, kMaximumAppendBatchSize);
auto start_index = follower_log_size;
auto end_index = start_index + batch_size;
const Term previous_term_from_index = PreviousTermFromIndex(index);
entries.insert(entries.begin(), state_.log.begin() + start_index, state_.log.begin() + end_index);
const Term previous_term_from_index = PreviousTermFromIndex(start_index);
Log("sending ", entries.size(), " entries to Follower ", address.last_known_port,
" which are above its known index of ", index);
" which are above its known confirmed log size of ", follower_log_size);
AppendRequest<WriteOperation> ar{
.term = state_.term,
.last_log_index = index,
.batch_start_log_index = start_index,
.last_log_term = previous_term_from_index,
.entries = entries,
.entries = std::move(entries),
.leader_commit = state_.committed_log_size,
};
// request_id not necessary to set because it's not a Future-backed Request.
static constexpr RequestId request_id = 0;
io_.Send(address, request_id, ar);
io_.Send(address, request_id, std::move(ar));
}
}
@ -375,8 +386,6 @@ class Raft {
return term;
}
LogIndex CommittedLogIndex() { return state_.committed_log_size; }
Term CommittedLogTerm() {
MG_ASSERT(state_.log.size() >= state_.committed_log_size);
if (state_.log.empty() || state_.committed_log_size == 0) {
@ -387,8 +396,6 @@ class Raft {
return term;
}
LogIndex LastLogIndex() { return state_.log.size(); }
Term LastLogTerm() const {
if (state_.log.empty()) {
return 0;
@ -415,7 +422,7 @@ class Raft {
(out << ... << args);
spdlog::debug(out.str());
spdlog::info(out.str());
}
/////////////////////////////////////////////////////////////
@ -452,11 +459,11 @@ class Raft {
if (now - candidate.election_began > election_timeout) {
state_.term++;
Log("becoming Candidate for term ", state_.term, " after leader timeout of ", election_timeout_us,
" elapsed since last election attempt");
"ms elapsed since last election attempt");
const VoteRequest request{
.term = state_.term,
.last_log_index = LastLogIndex(),
.log_size = state_.log.size(),
.last_log_term = LastLogTerm(),
};
@ -546,11 +553,11 @@ class Raft {
// all roles can receive Vote and possibly become a follower
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &, VoteRequest &&req, RequestId request_id, Address from_address) {
Log("received Vote from ", from_address.last_known_port, " with term ", req.term);
Log("received VoteRequest from ", from_address.last_known_port, " with term ", req.term);
const bool last_log_term_dominates = req.last_log_term >= LastLogTerm();
const bool term_dominates = req.term > state_.term;
const bool last_log_index_dominates = req.last_log_index >= LastLogIndex();
const bool new_leader = last_log_term_dominates && term_dominates && last_log_index_dominates;
const bool log_size_dominates = req.log_size >= state_.log.size();
const bool new_leader = last_log_term_dominates && term_dominates && log_size_dominates;
if (new_leader) {
MG_ASSERT(req.term > state_.term);
@ -606,14 +613,14 @@ class Raft {
for (const auto &[address, committed_log_size] : candidate.successful_votes) {
FollowerTracker follower{
.next_index = committed_log_size,
.confirmed_contiguous_index = committed_log_size,
.confirmed_log_size = committed_log_size,
};
followers.insert({address, follower});
}
for (const auto &address : candidate.outstanding_votes) {
FollowerTracker follower{
.next_index = state_.log.size(),
.confirmed_contiguous_index = 0,
.confirmed_log_size = 0,
};
followers.insert({address, follower});
}
@ -640,11 +647,13 @@ class Raft {
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &role, AppendRequest<WriteOperation> &&req, RequestId request_id,
Address from_address) {
// log size starts out as state_.committed_log_size and only if everything is successful do we
// switch it to the log length.
AppendResponse res{
.success = false,
.term = state_.term,
.last_log_term = CommittedLogTerm(),
.last_log_index = CommittedLogIndex(),
.log_size = state_.log.size(),
};
if constexpr (std::is_same<AllRoles, Leader>()) {
@ -689,33 +698,32 @@ class Raft {
MG_ASSERT(false, "Somehow entered Follower-specific logic as a non-Follower");
}
res.last_log_term = LastLogTerm();
res.last_log_index = LastLogIndex();
Log("returning last_log_index of ", res.last_log_index);
// Handle steady-state conditions.
if (req.last_log_index != LastLogIndex()) {
Log("req.last_log_index is above our last applied log index");
if (req.batch_start_log_index != state_.log.size()) {
Log("req.batch_start_log_index of ", req.batch_start_log_index, " does not match our log size of ",
state_.log.size());
} else if (req.last_log_term != LastLogTerm()) {
Log("req.last_log_term differs from our leader term at that slot, expected: ", LastLogTerm(), " but got ",
req.last_log_term);
} else {
// happy path - Apply log
Log("applying batch of entries to log of size ", req.entries.size());
Log("applying batch of ", req.entries.size(), " entries to our log starting at index ",
req.batch_start_log_index);
MG_ASSERT(req.last_log_index >= state_.committed_log_size,
auto resize_length = req.batch_start_log_index;
MG_ASSERT(resize_length >= state_.committed_log_size,
"Applied history from Leader which goes back in time from our commit_index");
// possibly chop-off stuff that was replaced by
// things with different terms (we got data that
// hasn't reached consensus yet, which is normal)
state_.log.resize(req.last_log_index);
state_.log.resize(resize_length);
state_.log.insert(state_.log.end(), req.entries.begin(), req.entries.end());
MG_ASSERT(req.leader_commit >= state_.committed_log_size);
state_.committed_log_size = std::min(req.leader_commit, LastLogIndex());
state_.committed_log_size = std::min(req.leader_commit, state_.log.size());
for (; state_.applied_size < state_.committed_log_size; state_.applied_size++) {
const auto &write_request = state_.log[state_.applied_size].second;
@ -725,6 +733,11 @@ class Raft {
res.success = true;
}
res.last_log_term = LastLogTerm();
res.log_size = state_.log.size();
Log("returning log_size of ", res.log_size);
io_.Send(from_address, request_id, res);
return std::nullopt;
@ -732,20 +745,26 @@ class Raft {
std::optional<Role> Handle(Leader &leader, AppendResponse &&res, RequestId, Address from_address) {
if (res.term != state_.term) {
Log("received AppendResponse related to a previous term when we (presumably) were the leader");
} else if (!leader.followers.contains(from_address)) {
Log("received AppendResponse from unknown Follower");
MG_ASSERT(false, "received AppendResponse from unknown Follower");
// TODO(tyler) when we have dynamic membership, this assert will become incorrect, but we should
// keep it in-place until then because it has bug finding value.
} else {
if (res.success) {
Log("got successful AppendResponse from ", from_address.last_known_port, " with last_log_index of ",
res.last_log_index);
} else {
Log("got unsuccessful AppendResponse from ", from_address.last_known_port, " with last_log_index of ",
res.last_log_index);
}
// term matches and we know this Follower
FollowerTracker &follower = leader.followers.at(from_address);
follower.next_index = std::max(follower.next_index, res.last_log_index);
follower.confirmed_contiguous_index = std::max(follower.confirmed_contiguous_index, res.last_log_index);
if (res.success) {
Log("got successful AppendResponse from ", from_address.last_known_port, " with log_size of ", res.log_size);
follower.next_index = std::max(follower.next_index, res.log_size);
} else {
Log("got unsuccessful AppendResponse from ", from_address.last_known_port, " with log_size of ", res.log_size);
follower.next_index = res.log_size;
}
follower.confirmed_log_size = std::max(follower.confirmed_log_size, res.log_size);
BumpCommitIndexAndReplyToClients(leader);
}

View File

@ -128,7 +128,7 @@ void RunSimulation() {
.scramble_messages = true,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 128},
};
auto simulator = Simulator(config);
@ -162,14 +162,14 @@ void RunSimulation() {
auto srv_thread_3 = std::jthread(RunRaft<SimulatorTransport>, std::move(srv_3));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_3);
spdlog::debug("beginning test after servers have become quiescent");
spdlog::info("beginning test after servers have become quiescent");
std::mt19937 cli_rng_{0};
Address server_addrs[]{srv_addr_1, srv_addr_2, srv_addr_3};
Address leader = server_addrs[0];
const int key = 0;
std::optional<int> last_known_value;
std::optional<int> last_known_value = 0;
bool success = false;
@ -185,7 +185,7 @@ void RunSimulation() {
WriteRequest<CasRequest> cli_req;
cli_req.operation = cas_req;
spdlog::debug("client sending CasRequest to Leader {} ", leader.last_known_port);
spdlog::info("client sending CasRequest to Leader {} ", leader.last_known_port);
ResponseFuture<WriteResponse<CasResponse>> cas_response_future =
cli_io.Request<WriteRequest<CasRequest>, WriteResponse<CasResponse>>(leader, cli_req);
@ -193,8 +193,8 @@ void RunSimulation() {
ResponseResult<WriteResponse<CasResponse>> cas_response_result = std::move(cas_response_future).Wait();
if (cas_response_result.HasError()) {
spdlog::debug("client timed out while trying to communicate with assumed Leader server {}",
leader.last_known_port);
spdlog::info("client timed out while trying to communicate with assumed Leader server {}",
leader.last_known_port);
continue;
}
@ -204,14 +204,14 @@ void RunSimulation() {
if (write_cas_response.retry_leader) {
MG_ASSERT(!write_cas_response.success, "retry_leader should never be set for successful responses");
leader = write_cas_response.retry_leader.value();
spdlog::debug("client redirected to leader server {}", leader.last_known_port);
spdlog::info("client redirected to leader server {}", leader.last_known_port);
} else if (!write_cas_response.success) {
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
size_t addr_index = addr_distrib(cli_rng_);
leader = server_addrs[addr_index];
spdlog::debug("client NOT redirected to leader server, trying a random one at index {} with port {}", addr_index,
leader.last_known_port);
spdlog::info("client NOT redirected to leader server, trying a random one at index {} with port {}", addr_index,
leader.last_known_port);
continue;
}
@ -219,8 +219,7 @@ void RunSimulation() {
bool cas_succeeded = cas_response.cas_success;
spdlog::debug("Client received CasResponse! success: {} last_known_value {}", cas_succeeded,
(int)*last_known_value);
spdlog::info("Client received CasResponse! success: {} last_known_value {}", cas_succeeded, (int)*last_known_value);
if (cas_succeeded) {
last_known_value = i;
@ -235,7 +234,7 @@ void RunSimulation() {
ReadRequest<GetRequest> read_req;
read_req.operation = get_req;
spdlog::debug("client sending GetRequest to Leader {}", leader.last_known_port);
spdlog::info("client sending GetRequest to Leader {}", leader.last_known_port);
ResponseFuture<ReadResponse<GetResponse>> get_response_future =
cli_io.Request<ReadRequest<GetRequest>, ReadResponse<GetResponse>>(leader, read_req);
@ -244,7 +243,7 @@ void RunSimulation() {
ResponseResult<ReadResponse<GetResponse>> get_response_result = std::move(get_response_future).Wait();
if (get_response_result.HasError()) {
spdlog::debug("client timed out while trying to communicate with Leader server {}", leader.last_known_port);
spdlog::info("client timed out while trying to communicate with Leader server {}", leader.last_known_port);
continue;
}
@ -259,21 +258,21 @@ void RunSimulation() {
if (read_get_response.retry_leader) {
MG_ASSERT(!read_get_response.success, "retry_leader should never be set for successful responses");
leader = read_get_response.retry_leader.value();
spdlog::debug("client redirected to Leader server {}", leader.last_known_port);
spdlog::info("client redirected to Leader server {}", leader.last_known_port);
} else if (!read_get_response.success) {
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
size_t addr_index = addr_distrib(cli_rng_);
leader = server_addrs[addr_index];
spdlog::debug("client NOT redirected to leader server, trying a random one at index {} with port {}", addr_index,
leader.last_known_port);
spdlog::info("client NOT redirected to leader server, trying a random one at index {} with port {}", addr_index,
leader.last_known_port);
}
GetResponse get_response = read_get_response.read_return;
MG_ASSERT(get_response.value == i);
spdlog::debug("client successfully cas'd a value and read it back! value: {}", i);
spdlog::info("client successfully cas'd a value and read it back! value: {}", i);
success = true;
}
@ -284,14 +283,14 @@ void RunSimulation() {
SimulatorStats stats = simulator.Stats();
spdlog::debug("total messages: ", stats.total_messages);
spdlog::debug("dropped messages: ", stats.dropped_messages);
spdlog::debug("timed out requests: ", stats.timed_out_requests);
spdlog::debug("total requests: ", stats.total_requests);
spdlog::debug("total responses: ", stats.total_responses);
spdlog::debug("simulator ticks: ", stats.simulator_ticks);
spdlog::info("total messages: {}", stats.total_messages);
spdlog::info("dropped messages: {}", stats.dropped_messages);
spdlog::info("timed out requests: {}", stats.timed_out_requests);
spdlog::info("total requests: {}", stats.total_requests);
spdlog::info("total responses: {}", stats.total_responses);
spdlog::info("simulator ticks: {}", stats.simulator_ticks);
spdlog::debug("========================== SUCCESS :) ==========================");
spdlog::info("========================== SUCCESS :) ==========================");
/*
this is implicit in jthread's dtor
@ -305,12 +304,12 @@ int main() {
int n_tests = 50;
for (int i = 0; i < n_tests; i++) {
spdlog::debug("========================== NEW SIMULATION {} ==========================", i);
spdlog::debug("\tTime\tTerm\tPort\tRole\t\tMessage\n");
spdlog::info("========================== NEW SIMULATION {} ==========================", i);
spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
RunSimulation();
}
spdlog::debug("passed {} tests!", n_tests);
spdlog::info("passed {} tests!", n_tests);
return 0;
}