Check-in skeleton for RSM logic on top of Raft

This commit is contained in:
Tyler Neely 2022-08-02 10:18:38 +00:00
parent 8be88deee6
commit 997fdf5a16
2 changed files with 198 additions and 84 deletions
src/io/rsm
tests/simulation

View File

@ -38,29 +38,32 @@ using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using Op = std::vector<uint8_t>;
using Term = uint64_t;
using LogIndex = uint64_t;
using Time = uint64_t;
using Duration = uint64_t;
using RequestId = uint64_t;
/// The request that a client sends to request that
/// the cluster replicates their data.
struct ReplicationRequest {
std::vector<uint8_t> opaque_data;
};
struct ReplicationResponse {
template <typename WriteValue>
struct WriteResponse {
bool success;
WriteValue write_value;
std::optional<Address> retry_leader;
};
template <typename ReadValue>
struct ReadResponse {
bool success;
ReadValue read_value;
std::optional<Address> retry_leader;
};
template <typename WriteRequest>
struct AppendRequest {
Term term = 0;
LogIndex last_log_index;
Term last_log_term;
std::vector<std::pair<Term, Op>> entries;
std::vector<std::pair<Term, WriteRequest>> entries;
LogIndex leader_commit;
};
@ -87,9 +90,10 @@ struct VoteResponse {
bool vote_granted = false;
};
template <typename WriteRequest>
struct CommonState {
Term term = 0;
std::vector<std::pair<Term, Op>> log;
std::vector<std::pair<Term, WriteRequest>> log;
LogIndex committed_log_size = 0;
LogIndex last_applied = 0;
};
@ -130,17 +134,48 @@ struct Follower {
using Role = std::variant<Candidate, Leader, Follower>;
template <typename IoImpl /*, typename ReplicatedState, ReplicatedStateMachine<ReplicatedState> Rsm*/>
/*
TODO make concept that expresses the fact that any RSM must
be able to have a specific type applied to it, returning
another interesting result type.
all ReplicatedState classes should have an apply method
that returns our WriteResponseValue:
ReadResponse read(ReadRequest);
WriteResponseValue ReplicatedState::apply(WriteRequest);
for examples:
if the state is uint64_t, and WriteRequest is `struct PlusOne {};`,
and WriteResponseValue is also uint64_t (the new value), then
each call to state.apply(PlusOne{}) will return the new value
after incrementing it. 0, 1, 2, 3... and this will be sent back
to the client that requested the mutation.
In practice, these mutations will usually be predicated on some
previous value, so that they are idempotent, functioning similarly
to a CAS operation.
template<typename Write, typename T, typename WriteResponse>
concept Rsm = requires(T t, Write w)
{
{ t.read(r) } -> std::same_as<ReadResponse>;
{ t.apply(w) } -> std::same_as<WriteResponseValue>;
};
*/
template <typename IoImpl, typename ReplicatedState, typename WriteRequest, typename WriteResponseValue,
typename ReadRequest, typename ReadResponseValue>
class Raft {
CommonState state_;
Role role_ = Candidate{};
Io<IoImpl> io_;
std::vector<Address> peers_;
// Rsm rsm_;
ReplicatedState replicated_state_;
public:
Raft(Io<IoImpl> &&io, std::vector<Address> peers /*, Rsm &&rsm */)
: io_(std::move(io)), peers_(peers) /*, rsm_(std::move(rsm)*/ {}
Raft(Io<IoImpl> &&io, std::vector<Address> peers, ReplicatedState &&replicated_state)
: io_(std::move(io)), peers_(peers) replicated_state_(std::move(replicated_state)) {}
void Run() {
Time last_cron = io_.Now();
@ -155,9 +190,8 @@ class Raft {
Duration receive_timeout = RandomTimeout(10000, 50000);
auto request_result =
io_.template ReceiveWithTimeout<AppendRequest, AppendResponse, ReplicationRequest, VoteRequest, VoteResponse>(
receive_timeout);
auto request_result = io_.template ReceiveWithTimeout<ReadRequest, AppendRequest, AppendResponse, WriteRequest,
VoteRequest, VoteResponse>(receive_timeout);
if (request_result.HasError()) {
continue;
}
@ -182,7 +216,17 @@ class Raft {
}
std::ranges::sort(indices, std::ranges::greater());
// assuming reverse sort (using std::ranges::greater)
state_.committed_log_size = indices[(indices.size() / 2)];
size_t new_committed_log_size = indices[(indices.size() / 2)];
// TODO(tyler / gabor) for each index between the old
// index and the new one, apply that log's WriteRequest
// to our replicated_state_, and use the specific return
// value of the ReplicatedState::apply method (WriteResponseValue)
// to respondto the requester.
//
// this will completely replace the while loop below
state_.committed_log_size = new_committed_log_size;
Log("committed_log_size is now ", state_.committed_log_size);
@ -190,7 +234,7 @@ class Raft {
const auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.committed_log_size) {
Log("responding SUCCESS to client");
ReplicationResponse rr{
WriteResponse rr{
.success = true,
.retry_leader = std::nullopt,
};
@ -208,7 +252,7 @@ class Raft {
for (auto &[address, follower] : followers) {
const LogIndex index = follower.confirmed_contiguous_index;
std::vector<std::pair<Term, Op>> entries;
std::vector<std::pair<Term, WriteRequest>> entries;
if (state_.log.size() > index) {
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
@ -273,6 +317,29 @@ class Raft {
return term;
}
template <typename... Ts>
void Log(Ts &&...args) {
const Time now = io_.Now();
const Term term = state_.term;
std::cout << '\t' << now << "\t" << term << "\t" << io_.GetAddress().last_known_port;
std::visit([&](auto &&role) { role.Print(); }, role_);
(std::cout << ... << args) << std::endl;
}
/////////////////////////////////////////////////////////////
/// Raft-related Cron methods
///
/// Cron + std::visit is how events are dispatched
/// to certain code based on Raft role.
///
/// Cron(role) takes as the first argument a reference to its
/// role, and as the second argument, the message that has
/// been received.
/////////////////////////////////////////////////////////////
/// Periodic protocol maintenance.
void Cron() {
// dispatch periodic logic based on our role to a specific Cron method.
@ -351,7 +418,9 @@ class Raft {
return std::nullopt;
}
/// **********************************************
/////////////////////////////////////////////////////////////
/// Raft-related Handle methods
///
/// Handle + std::visit is how events are dispatched
/// to certain code based on Raft role.
///
@ -359,10 +428,10 @@ class Raft {
/// takes as the first argument a reference
/// to its role, and as the second argument, the
/// message that has been received.
/// **********************************************
void Handle(
std::variant<AppendRequest, AppendResponse, ReplicationRequest, VoteRequest, VoteResponse> &&message_variant,
RequestId request_id, Address from_address) {
/////////////////////////////////////////////////////////////
void Handle(std::variant<AppendRequest, AppendResponse, Write, VoteRequest, VoteResponse> &&message_variant,
RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
// or it can be `auto` if it's a handler for several roles
@ -470,57 +539,6 @@ class Raft {
return std::nullopt;
}
// only leaders actually handle replication requests from clients
std::optional<Role> Handle(Leader &leader, ReplicationRequest &&req, RequestId request_id, Address from_address) {
Log("received ReplicationRequest");
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.opaque_data)));
PendingClientRequest pcr{
.log_index = state_.log.size(),
.request_id = request_id,
.address = from_address,
};
leader.pending_client_requests.push_back(pcr);
BroadcastAppendEntries(leader.followers);
// TODO(tyler) add message to pending requests buffer, reply asynchronously
return std::nullopt;
}
// Raft paper - 8
// When a client first starts up, it connects to a randomly chosen
// server. If the clients first choice is not the leader, that
// server will reject the clients request and supply information
// about the most recent leader it has heard from.
std::optional<Role> Handle(Follower &follower, ReplicationRequest &&req, RequestId request_id, Address from_address) {
auto res = ReplicationResponse{};
res.success = false;
Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port);
res.retry_leader = follower.leader_address;
io_.Send(from_address, request_id, res);
return std::nullopt;
}
std::optional<Role> Handle(Candidate &, ReplicationRequest &&req, RequestId request_id, Address from_address) {
Log("received ReplicationRequest - not redirecting because no Leader is known");
auto res = ReplicationResponse{};
res.success = false;
Cron();
io_.Send(from_address, request_id, res);
return std::nullopt;
}
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &role, AppendRequest &&req, RequestId request_id, Address from_address) {
AppendResponse res{
@ -635,16 +653,77 @@ class Raft {
return std::nullopt;
}
template <typename... Ts>
void Log(Ts &&...args) {
const Time now = io_.Now();
const Term term = state_.term;
/////////////////////////////////////////////////////////////
/// RSM-related handle methods
/////////////////////////////////////////////////////////////
std::cout << '\t' << now << "\t" << term << "\t" << io_.GetAddress().last_known_port;
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
std::optional<Role> Handle(Leader &leader, ReadRequest &&res, RequestId request_id, Address from_address) {
// TODO(tyler / gabor) implement
return std::nullopt;
}
std::visit([&](auto &&role) { role.Print(); }, role_);
// Candidates should respond with a failure, similar to the Candidate + WriteRequest failure below
std::optional<Role> Handle(Candidate &, ReadRequest &&req, RequestId request_id, Address from_address) {
// TODO(tyler / gabor) implement
return std::nullopt;
}
(std::cout << ... << args) << std::endl;
// Leaders should respond with a redirection, similar to the Follower + WriteRequest response below
std::optional<Role> Handle(Follower &, ReadRequest &&req, RequestId request_id, Address from_address) {
// TODO(tyler / gabor) implement
return std::nullopt;
}
// Raft paper - 8
// When a client first starts up, it connects to a randomly chosen
// server. If the clients first choice is not the leader, that
// server will reject the clients request and supply information
// about the most recent leader it has heard from.
std::optional<Role> Handle(Follower &follower, WriteRequest &&req, RequestId request_id, Address from_address) {
auto res = WriteResponse{};
res.success = false;
Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port);
res.retry_leader = follower.leader_address;
io_.Send(from_address, request_id, res);
return std::nullopt;
}
std::optional<Role> Handle(Candidate &, WriteRequest &&req, RequestId request_id, Address from_address) {
Log("received WriteRequest - not redirecting because no Leader is known");
auto res = WriteResponse{};
res.success = false;
Cron();
io_.Send(from_address, request_id, res);
return std::nullopt;
}
// only leaders actually handle replication requests from clients
std::optional<Role> Handle(Leader &leader, WriteRequest &&req, RequestId request_id, Address from_address) {
Log("received WriteRequest");
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.opaque_data)));
PendingClientRequest pcr{
.log_index = state_.log.size() - 1,
.request_id = request_id,
.address = from_address,
};
leader.pending_client_requests.push_back(pcr);
BroadcastAppendEntries(leader.followers);
// TODO(tyler) add message to pending requests buffer, reply asynchronously
return std::nullopt;
}
};

View File

@ -32,8 +32,40 @@ using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
class TestState {
std::map<int, int> state_;
public:
ReadResponse read(ReadRequest request) {
// TODO(tyler / gabor) implement
}
CasResponse apply(CasRequest request) {
// TODO(tyler / gabor) implement
}
};
struct Cas {
int key;
int old_value;
int new_value;
};
struct CasResponse {
bool success;
std::optional<int> last_value;
};
struct ReadRequest {
int key;
};
struct ReadResponse {
int value;
};
template <typename IoImpl>
void RunRaft(Raft<IoImpl> server) {
void RunRaft(Raft<IoImpl, TestState, Cas, CasResponse, ReadRequest, ReadResponse> server) {
server.Run();
}
@ -63,6 +95,7 @@ void RunSimulation() {
std::vector<Address> srv_2_peers = {srv_addr_1, srv_addr_3};
std::vector<Address> srv_3_peers = {srv_addr_1, srv_addr_2};
// TODO(tyler / gabor) supply default TestState to Raft constructor
Raft srv_1{std::move(srv_io_1), srv_1_peers};
Raft srv_2{std::move(srv_io_2), srv_2_peers};
Raft srv_3{std::move(srv_io_3), srv_3_peers};
@ -88,6 +121,8 @@ void RunSimulation() {
ReplicationRequest cli_req;
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
// TODO(tyler / gabor) replace Replication* with Cas/Read
std::cout << "client sending ReplicationRequest to Leader " << leader.last_known_port << std::endl;
ResponseFuture<ReplicationResponse> response_future =
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(leader, cli_req, 50000);