Merge branch 'T0941-MG-implement-basic-raft-version' of github.com:memgraph/memgraph into T0912-MG-in-memory-shard-map

This commit is contained in:
Tyler Neely 2022-08-16 16:51:09 +00:00
commit f0dc0d911d
2 changed files with 36 additions and 33 deletions
src/io/rsm
tests/simulation

View File

@ -135,7 +135,7 @@ struct Leader {
std::unordered_map<LogIndex, PendingClientRequest> pending_client_requests;
Time last_broadcast = Time::min();
static void Print() { std::cout << "\tLeader \t"; }
std::string ToString() { return "\tLeader \t"; }
};
struct Candidate {
@ -143,14 +143,14 @@ struct Candidate {
Time election_began = Time::min();
std::set<Address> outstanding_votes;
static void Print() { std::cout << "\tCandidate\t"; }
std::string ToString() { return "\tCandidate\t"; }
};
struct Follower {
Time last_received_append_entries_timestamp;
Address leader_address;
static void Print() { std::cout << "\tFollower \t"; }
std::string ToString() { return "\tFollower \t"; }
};
using Role = std::variant<Candidate, Leader, Follower>;
@ -189,7 +189,7 @@ concept Rsm = requires(ReplicatedState state, WriteOperation w, ReadOperation r)
/// ReplicatedState the high-level data structure that is managed by the raft-backed replicated state machine
/// WriteOperation the individual operation type that is applied to the ReplicatedState in identical order
/// across each replica
/// WriteResponseValue the return value of calling ReplicatedState::write(WriteOperation), which is executed in
/// WriteResponseValue the return value of calling ReplicatedState::Apply(WriteOperation), which is executed in
/// identical order across all replicas after an WriteOperation reaches consensus.
/// ReadOperation the type of operations that do not require consensus before executing directly
/// on a const ReplicatedState &
@ -292,7 +292,7 @@ class Raft {
WriteResponse<WriteResponseValue> resp;
resp.success = true;
resp.write_return = write_return;
resp.write_return = std::move(write_return);
io_.Send(client_request.address, client_request.request_id, std::move(resp));
leader.pending_client_requests.erase(apply_index);
@ -328,7 +328,7 @@ class Raft {
};
// request_id not necessary to set because it's not a Future-backed Request.
const RequestId request_id = 0;
static constexpr RequestId request_id = 0;
io_.Send(address, request_id, ar);
}
@ -337,10 +337,7 @@ class Raft {
// Raft paper - 5.2
// Raft uses randomized election timeouts to ensure that split votes are rare and that they are resolved quickly
Duration RandomTimeout(Duration min, Duration max) {
auto min_micros = std::chrono::duration_cast<std::chrono::milliseconds>(min).count();
auto max_micros = std::chrono::duration_cast<std::chrono::milliseconds>(max).count();
std::uniform_int_distribution time_distrib(min_micros, max_micros);
std::uniform_int_distribution time_distrib(min.count(), max.count());
auto rand_micros = io_.Rand(time_distrib);
@ -355,12 +352,12 @@ class Raft {
return std::chrono::microseconds{rand_micros};
}
Term PreviousTermFromIndex(LogIndex index) {
Term PreviousTermFromIndex(LogIndex index) const {
if (index == 0 || state_.log.size() + 1 <= index) {
return 0;
}
auto &[term, data] = state_.log.at(index - 1);
const auto &[term, data] = state_.log.at(index - 1);
return term;
}
@ -378,12 +375,12 @@ class Raft {
LogIndex LastLogIndex() { return state_.log.size(); }
Term LastLogTerm() {
Term LastLogTerm() const {
if (state_.log.empty()) {
return 0;
}
auto &[term, data] = state_.log.back();
const auto &[term, data] = state_.log.back();
return term;
}
@ -394,11 +391,17 @@ class Raft {
const Term term = state_.term;
std::cout << '\t' << micros << "\t" << term << "\t" << io_.GetAddress().last_known_port;
std::ostringstream out;
std::visit([&](auto &&role) { role.Print(); }, role_);
out << '\t' << (int)micros << "\t" << term << "\t" << io_.GetAddress().last_known_port;
(std::cout << ... << args) << std::endl;
std::string role_string = std::visit([&](auto &&role) { return role.ToString(); }, role_);
out << role_string;
(out << ... << args);
spdlog::debug(out.str());
}
/////////////////////////////////////////////////////////////
@ -430,7 +433,7 @@ class Raft {
std::optional<Role> Cron(Candidate &candidate) {
const auto now = io_.Now();
const Duration election_timeout = RandomTimeout(100000, 200000);
auto election_timeout_us = std::chrono::duration_cast<std::chrono::milliseconds>(election_timeout).count();
const auto election_timeout_us = std::chrono::duration_cast<std::chrono::milliseconds>(election_timeout).count();
if (now - candidate.election_began > election_timeout) {
state_.term++;
@ -447,7 +450,7 @@ class Raft {
for (const auto &peer : peers_) {
// request_id not necessary to set because it's not a Future-backed Request.
auto request_id = 0;
static constexpr auto request_id = 0;
io_.template Send<VoteRequest>(peer, request_id, request);
outstanding_votes.insert(peer);
}

View File

@ -162,7 +162,7 @@ void RunSimulation() {
auto srv_thread_3 = std::jthread(RunRaft<SimulatorTransport>, std::move(srv_3));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_3);
std::cout << "beginning test after servers have become quiescent" << std::endl;
spdlog::debug("beginning test after servers have become quiescent");
std::mt19937 cli_rng_{0};
std::vector<Address> server_addrs{srv_addr_1, srv_addr_2, srv_addr_3};
@ -197,8 +197,8 @@ void RunSimulation() {
bool cas_succeeded = cas_response.cas_success;
std::cout << "Client received CasResponse! success: " << cas_succeeded
<< " last_known_value: " << (int)*last_known_value << std::endl;
spdlog::debug("Client received CasResponse! success: {} last_known_value {}", cas_succeeded,
(int)*last_known_value);
if (cas_succeeded) {
last_known_value = i;
@ -223,7 +223,7 @@ void RunSimulation() {
MG_ASSERT(get_response.value == i);
std::cout << "client successfully cas'd a value and read it back! value: " << i << std::endl;
spdlog::debug("client successfully cas'd a value and read it back! value: {}", i);
success = true;
}
@ -234,14 +234,14 @@ void RunSimulation() {
SimulatorStats stats = simulator.Stats();
std::cout << "total messages: " << stats.total_messages << std::endl;
std::cout << "dropped messages: " << stats.dropped_messages << std::endl;
std::cout << "timed out requests: " << stats.timed_out_requests << std::endl;
std::cout << "total requests: " << stats.total_requests << std::endl;
std::cout << "total responses: " << stats.total_responses << std::endl;
std::cout << "simulator ticks: " << stats.simulator_ticks << std::endl;
spdlog::debug("total messages: ", stats.total_messages);
spdlog::debug("dropped messages: ", stats.dropped_messages);
spdlog::debug("timed out requests: ", stats.timed_out_requests);
spdlog::debug("total requests: ", stats.total_requests);
spdlog::debug("total responses: ", stats.total_responses);
spdlog::debug("simulator ticks: ", stats.simulator_ticks);
std::cout << "========================== SUCCESS :) ==========================" << std::endl;
spdlog::debug("========================== SUCCESS :) ==========================");
/*
this is implicit in jthread's dtor
@ -255,12 +255,12 @@ int main() {
int n_tests = 50;
for (int i = 0; i < n_tests; i++) {
std::cout << "========================== NEW SIMULATION " << i << " ==========================" << std::endl;
std::cout << "\tTime\tTerm\tPort\tRole\t\tMessage\n";
spdlog::debug("========================== NEW SIMULATION {} ==========================", i);
spdlog::debug("\tTime\tTerm\tPort\tRole\t\tMessage\n");
RunSimulation();
}
std::cout << "passed " << n_tests << " tests!" << std::endl;
spdlog::debug("passed {} tests!", n_tests);
return 0;
}