Hoist read and write requests on the RSM into wrapper structs for futureproofing

This commit is contained in:
Tyler Neely 2022-08-03 14:21:37 +00:00
parent 74b354979c
commit aebac2c519
2 changed files with 92 additions and 51 deletions
src/io/rsm
tests/simulation

View File

@ -44,26 +44,36 @@ using Time = uint64_t;
using Duration = uint64_t;
using RequestId = uint64_t;
template <typename WriteOperation>
struct WriteRequest {
WriteOperation operation;
};
/// WriteResponse is returned to a client after
/// their WriteRequest was entered in to the raft
/// log and it reached consensus.
///
/// WriteValue is the result of applying the WriteRequest to
/// WriteReturn is the result of applying the WriteRequest to
/// ReplicatedState, and if the ReplicatedState::write
/// method is deterministic, all replicas will
/// have the same ReplicatedState after applying
/// the submitted WriteRequest.
template <typename WriteValue>
template <typename WriteReturn>
struct WriteResponse {
bool success;
WriteValue write_value;
WriteReturn write_return;
std::optional<Address> retry_leader;
};
template <typename ReadValue>
template <typename ReadOperation>
struct ReadRequest {
ReadOperation operation;
};
template <typename ReadReturn>
struct ReadResponse {
bool success;
ReadValue read_value;
ReadReturn read_return;
std::optional<Address> retry_leader;
};
@ -152,7 +162,7 @@ another interesting result type.
all ReplicatedState classes should have an apply method
that returns our WriteResponseValue:
ReadResponse read(ReadRequest);
ReadResponse read(ReadOperation);
WriteResponseValue ReplicatedState::apply(WriteRequest);
for examples:
@ -174,18 +184,30 @@ concept Rsm = requires(T t, Write w)
};
*/
template <typename WriteRequest, typename ReadRequest, typename T, typename WriteResponseValue,
template <typename WriteRequest, typename ReadOperation, typename T, typename WriteResponseValue,
typename ReadResponseValue>
concept Rsm = requires(T t, WriteRequest w, ReadRequest r) {
concept Rsm = requires(T t, WriteRequest w, ReadOperation r) {
{ t.read(r) } -> std::same_as<ReadResponseValue>;
{ t.apply(w) } -> std::same_as<WriteResponseValue>;
};
template <typename IoImpl, typename ReplicatedState, typename WriteRequest, typename WriteResponseValue,
typename ReadRequest, typename ReadResponseValue>
requires Rsm<WriteRequest, ReadRequest, ReplicatedState, WriteResponseValue, ReadResponseValue>
/// Parameter Purpose
/// --------------------------
/// IoImpl the concrete Io provider - SimulatorTransport, ThriftTransport, etc...
/// ReplicatedState the high-level data structure that is managed by the raft-backed replicated state machine
/// WriteOperation the individual operation type that is applied to the ReplicatedState in identical order
/// across each replica
/// WriteResponseValue the return value of calling ReplicatedState::write(WriteOperation), which is executed in
/// identical order across all replicas after an WriteOperation reaches consensus.
/// ReadOperation the type of operations that do not require consensus before executing directly
/// on a const ReplicatedState &
/// ReadResponseValue the return value of calling ReplicatedState::read(ReadOperation), which is executed directly
/// without going through consensus first
template <typename IoImpl, typename ReplicatedState, typename WriteOperation, typename WriteResponseValue,
typename ReadOperation, typename ReadResponseValue>
// requires Rsm<WriteRequest<WriteOperation>, ReadOperation, ReplicatedState, WriteResponseValue, ReadResponseValue>
class Raft {
CommonState<WriteRequest> state_;
CommonState<WriteOperation> state_;
Role role_ = Candidate{};
Io<IoImpl> io_;
std::vector<Address> peers_;
@ -208,8 +230,9 @@ class Raft {
Duration receive_timeout = RandomTimeout(10000, 50000);
auto request_result = io_.template ReceiveWithTimeout<ReadRequest, AppendRequest<WriteRequest>, AppendResponse,
WriteRequest, VoteRequest, VoteResponse>(receive_timeout);
auto request_result =
io_.template ReceiveWithTimeout<ReadRequest<ReadOperation>, AppendRequest<WriteOperation>, AppendResponse,
WriteRequest<WriteOperation>, VoteRequest, VoteResponse>(receive_timeout);
if (request_result.HasError()) {
continue;
}
@ -237,7 +260,7 @@ class Raft {
size_t new_committed_log_size = indices[(indices.size() / 2)];
// TODO(tyler / gabor) for each index between the old
// index and the new one, apply that log's WriteRequest
// index and the new one, apply that log's WriteOperation
// to our replicated_state_, and use the specific return
// value of the ReplicatedState::apply method (WriteResponseValue)
// to respondto the requester.
@ -252,10 +275,10 @@ class Raft {
const auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.committed_log_size) {
const auto &write_request = state_.log[front.log_index].second;
WriteResponseValue write_response = replicated_state_.apply(write_request);
WriteResponseValue write_return = replicated_state_.apply(write_request);
WriteResponse<WriteResponseValue> resp;
resp.success = true;
resp.write_value = write_response;
resp.write_return = write_return;
// Log("responding SUCCESS to client");
// WriteResponse rr{
// .success = true,
@ -275,7 +298,7 @@ class Raft {
for (auto &[address, follower] : followers) {
const LogIndex index = follower.confirmed_contiguous_index;
std::vector<std::pair<Term, WriteRequest>> entries;
std::vector<std::pair<Term, WriteOperation>> entries;
if (state_.log.size() > index) {
entries.insert(entries.begin(), state_.log.begin() + index, state_.log.end());
@ -286,7 +309,7 @@ class Raft {
Log("sending ", entries.size(), " entries to Follower ", address.last_known_port,
" which are above its known index of ", index);
AppendRequest<WriteRequest> ar{
AppendRequest<WriteOperation> ar{
.term = state_.term,
.last_log_index = index,
.last_log_term = previous_term_from_index,
@ -454,8 +477,8 @@ class Raft {
/////////////////////////////////////////////////////////////
// Don't we need more stuff in this variant?
void Handle(std::variant<ReadRequest, AppendRequest<WriteRequest>, AppendResponse, WriteRequest, VoteRequest,
VoteResponse> &&message_variant,
void Handle(std::variant<ReadRequest<ReadOperation>, AppendRequest<WriteOperation>, AppendResponse,
WriteRequest<WriteOperation>, VoteRequest, VoteResponse> &&message_variant,
RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
@ -565,7 +588,7 @@ class Raft {
}
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &role, AppendRequest<WriteRequest> &&req, RequestId request_id,
std::optional<Role> Handle(AllRoles &role, AppendRequest<WriteOperation> &&req, RequestId request_id,
Address from_address) {
AppendResponse res{
.success = false,
@ -684,20 +707,31 @@ class Raft {
/////////////////////////////////////////////////////////////
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
std::optional<Role> Handle(Leader &leader, ReadRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Leader &leader, ReadRequest<ReadOperation> &&req, RequestId request_id,
Address from_address) {
// TODO(tyler / gabor) implement
ReadResponse<ReadResponseValue> resp = replicated_state_.read(req);
io_.send(from_address, request_id, resp);
ReadOperation read_operation = req.operation;
ReadResponseValue read_return = replicated_state_.read(read_operation);
ReadResponse<ReadResponseValue> resp{
.success = true,
.read_return = std::move(read_return),
.retry_leader = std::nullopt,
};
io_.Send(from_address, request_id, resp);
return std::nullopt;
}
// Candidates should respond with a failure, similar to the Candidate + WriteRequest failure below
std::optional<Role> Handle(Candidate &, ReadRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Candidate &, ReadRequest<ReadOperation> &&req, RequestId request_id,
Address from_address) {
// TODO(tyler / gabor) implement
Log("received ReadRequest - not redirecting because no Leader is known");
Log("received ReadOperation - not redirecting because no Leader is known");
auto res = ReadResponse<ReadResponseValue>{};
res.success = false;
@ -710,7 +744,8 @@ class Raft {
}
// Leaders should respond with a redirection, similar to the Follower + WriteRequest response below
std::optional<Role> Handle(Follower &follower, ReadRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Follower &follower, ReadRequest<ReadOperation> &&req, RequestId request_id,
Address from_address) {
// TODO(tyler / gabor) implement
auto res = ReadResponse<ReadResponseValue>{};
@ -729,7 +764,8 @@ class Raft {
// server. If the clients first choice is not the leader, that
// server will reject the clients request and supply information
// about the most recent leader it has heard from.
std::optional<Role> Handle(Follower &follower, WriteRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Follower &follower, WriteRequest<WriteOperation> &&req, RequestId request_id,
Address from_address) {
auto res = WriteResponse<WriteResponseValue>{};
res.success = false;
@ -741,7 +777,8 @@ class Raft {
return std::nullopt;
}
std::optional<Role> Handle(Candidate &, WriteRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Candidate &, WriteRequest<WriteOperation> &&req, RequestId request_id,
Address from_address) {
Log("received WriteRequest - not redirecting because no Leader is known");
auto res = WriteResponse<WriteResponseValue>{};
@ -755,11 +792,12 @@ class Raft {
}
// only leaders actually handle replication requests from clients
std::optional<Role> Handle(Leader &leader, WriteRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Leader &leader, WriteRequest<WriteOperation> &&req, RequestId request_id,
Address from_address) {
Log("received WriteRequest");
// we are the leader. add item to log and send Append to peers
state_.log.emplace_back(std::pair(state_.term, std::move(req.opaque_data)));
state_.log.emplace_back(std::pair(state_.term, std::move(req.operation)));
PendingClientRequest pcr{
.log_index = state_.log.size() - 1,

View File

@ -28,8 +28,10 @@ using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::rsm::Raft;
// using memgraph::io::rsm::ReplicationRequest;
// using memgraph::io::rsm::ReplicationResponse;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
@ -44,24 +46,22 @@ struct CasRequest {
struct CasResponse {
bool success;
std::optional<int> last_value;
std::optional<Address> retry_leader;
};
struct ReadRequest {
struct GetRequest {
int key;
};
struct ReadResponse {
struct GetResponse {
int value;
std::optional<Address> retry_leader;
};
class TestState {
std::map<int, int> state_;
public:
ReadResponse read(ReadRequest request) {
ReadResponse ret;
GetResponse read(GetRequest request) {
GetResponse ret;
ret.value = state_.at(request.key);
return ret;
}
@ -113,7 +113,7 @@ class TestState {
};
template <typename IoImpl>
void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, ReadRequest, ReadResponse> server) {
void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetResponse> server) {
server.Run();
}
@ -144,7 +144,7 @@ void RunSimulation() {
std::vector<Address> srv_3_peers = {srv_addr_1, srv_addr_2};
// TODO(tyler / gabor) supply default TestState to Raft constructor
using RaftClass = Raft<SimulatorTransport, TestState, CasRequest, CasResponse, ReadRequest, ReadResponse>;
using RaftClass = Raft<SimulatorTransport, TestState, CasRequest, CasResponse, GetRequest, GetResponse>;
RaftClass srv_1{std::move(srv_io_1), srv_1_peers, TestState{}};
RaftClass srv_2{std::move(srv_io_2), srv_2_peers, TestState{}};
RaftClass srv_3{std::move(srv_io_3), srv_3_peers, TestState{}};
@ -167,27 +167,30 @@ void RunSimulation() {
while (true) {
// send request
CasRequest cli_req;
cli_req.key = 0;
cli_req.old_value = std::nullopt;
cli_req.new_value = 12;
CasRequest cas_req;
cas_req.key = 0;
cas_req.old_value = std::nullopt;
cas_req.new_value = 12;
WriteRequest<CasRequest> cli_req;
cli_req.operation = cas_req;
// TODO(tyler / gabor) replace Replication* with Cas/Read
std::cout << "client sending ReplicationRequest to Leader " << leader.last_known_port << std::endl;
ResponseFuture<CasResponse> response_future =
cli_io.RequestWithTimeout<CasRequest, CasResponse>(leader, cli_req, 50000);
ResponseFuture<WriteResponse<CasResponse>> response_future =
cli_io.RequestWithTimeout<WriteRequest<CasRequest>, WriteResponse<CasResponse>>(leader, cli_req, 50000);
// receive response
ResponseResult<CasResponse> response_result = response_future.Wait();
ResponseResult<WriteResponse<CasResponse>> response_result = response_future.Wait();
if (response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
continue;
}
ResponseEnvelope<CasResponse> response_envelope = response_result.GetValue();
CasResponse response = response_envelope.message;
ResponseEnvelope<WriteResponse<CasResponse>> response_envelope = response_result.GetValue();
WriteResponse<CasResponse> response = response_envelope.message;
if (response.success) {
success = true;