Update raft implementation to use std::chrono like upstream

This commit is contained in:
Tyler Neely 2022-08-04 13:28:52 +00:00
parent 618a3d96b3
commit 0a43afdec1
3 changed files with 36 additions and 14 deletions
src/io/rsm
tests/simulation

View File

@ -29,10 +29,12 @@
namespace memgraph::io::rsm {
using memgraph::io::Address;
using memgraph::io::Duration;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::Time;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
@ -40,8 +42,6 @@ using memgraph::io::simulator::SimulatorTransport;
using Term = uint64_t;
using LogIndex = uint64_t;
using Time = uint64_t;
using Duration = uint64_t;
using RequestId = uint64_t;
template <typename WriteOperation>
@ -132,14 +132,14 @@ struct PendingClientRequest {
struct Leader {
std::map<Address, FollowerTracker> followers;
std::deque<PendingClientRequest> pending_client_requests;
Time last_broadcast = 0;
Time last_broadcast = Time::min();
void Print() { std::cout << "\tLeader \t"; }
};
struct Candidate {
std::map<Address, LogIndex> successful_votes;
Time election_began = 0;
Time election_began = Time::min();
std::set<Address> outstanding_votes;
void Print() { std::cout << "\tCandidate\t"; }
@ -327,8 +327,22 @@ class Raft {
// Raft paper - 5.2
// Raft uses randomized election timeouts to ensure that split votes are rare and that they are resolved quickly
Duration RandomTimeout(Duration min, Duration max) {
std::uniform_int_distribution time_distrib(min, max);
return io_.Rand(time_distrib);
auto min_micros = std::chrono::duration_cast<std::chrono::milliseconds>(min).count();
auto max_micros = std::chrono::duration_cast<std::chrono::milliseconds>(max).count();
std::uniform_int_distribution time_distrib(min_micros, max_micros);
auto rand_micros = io_.Rand(time_distrib);
return std::chrono::microseconds{rand_micros};
}
Duration RandomTimeout(int min_micros, int max_micros) {
std::uniform_int_distribution time_distrib(min_micros, max_micros);
int rand_micros = io_.Rand(time_distrib);
return std::chrono::microseconds{rand_micros};
}
Term PreviousTermFromIndex(LogIndex index) {
@ -366,9 +380,11 @@ class Raft {
template <typename... Ts>
void Log(Ts &&...args) {
const Time now = io_.Now();
auto micros = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
const Term term = state_.term;
std::cout << '\t' << now << "\t" << term << "\t" << io_.GetAddress().last_known_port;
std::cout << '\t' << micros << "\t" << term << "\t" << io_.GetAddress().last_known_port;
std::visit([&](auto &&role) { role.Print(); }, role_);
@ -404,10 +420,11 @@ class Raft {
std::optional<Role> Cron(Candidate &candidate) {
const auto now = io_.Now();
const Duration election_timeout = RandomTimeout(100000, 200000);
auto election_timeout_us = std::chrono::duration_cast<std::chrono::milliseconds>(election_timeout).count();
if (now - candidate.election_began > election_timeout) {
state_.term++;
Log("becoming Candidate for term ", state_.term, " after leader timeout of ", election_timeout,
Log("becoming Candidate for term ", state_.term, " after leader timeout of ", election_timeout_us,
" elapsed since last election attempt");
const VoteRequest request{

View File

@ -27,4 +27,6 @@ endfunction(add_simulation_test)
add_simulation_test(basic_request.cpp address)
add_simulation_test(raft.cpp address)
add_simulation_test(trial_query_storage/query_storage_test.cpp address)

View File

@ -9,6 +9,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <deque>
#include <iostream>
#include <map>
@ -23,10 +24,12 @@
#include "io/simulator/simulator_transport.hpp"
using memgraph::io::Address;
using memgraph::io::Duration;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::Time;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
@ -124,8 +127,8 @@ void RunSimulation() {
.perform_timeouts = true,
.scramble_messages = true,
.rng_seed = 0,
.start_time = 256 * 1024,
.abort_time = 8 * 1024 * 1024,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{8 * 1024 * 1024},
};
auto simulator = Simulator(config);
@ -184,10 +187,10 @@ void RunSimulation() {
std::cout << "client sending CasRequest to Leader " << leader.last_known_port << std::endl;
ResponseFuture<WriteResponse<CasResponse>> cas_response_future =
cli_io.RequestWithTimeout<WriteRequest<CasRequest>, WriteResponse<CasResponse>>(leader, cli_req, 50000);
cli_io.Request<WriteRequest<CasRequest>, WriteResponse<CasResponse>>(leader, cli_req);
// receive cas_response
ResponseResult<WriteResponse<CasResponse>> cas_response_result = cas_response_future.Wait();
ResponseResult<WriteResponse<CasResponse>> cas_response_result = std::move(cas_response_future).Wait();
if (cas_response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
@ -233,10 +236,10 @@ void RunSimulation() {
std::cout << "client sending GetRequest to Leader " << leader.last_known_port << std::endl;
ResponseFuture<ReadResponse<GetResponse>> get_response_future =
cli_io.RequestWithTimeout<ReadRequest<GetRequest>, ReadResponse<GetResponse>>(leader, read_req, 50000);
cli_io.Request<ReadRequest<GetRequest>, ReadResponse<GetResponse>>(leader, read_req);
// receive response
ResponseResult<ReadResponse<GetResponse>> get_response_result = get_response_future.Wait();
ResponseResult<ReadResponse<GetResponse>> get_response_result = std::move(get_response_future).Wait();
if (get_response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;