Unfinished rsm changes

This commit is contained in:
gvolfing 2022-08-03 14:39:09 +02:00
parent 997fdf5a16
commit b5cff5999b
2 changed files with 133 additions and 43 deletions
src/io/rsm
tests/simulation

View File

@ -164,10 +164,18 @@ concept Rsm = requires(T t, Write w)
};
*/
template <typename WriteRequest, typename ReadRequest, typename T, typename WriteResponseValue,
typename ReadResponseValue>
concept Rsm = requires(T t, WriteRequest w, ReadRequest r) {
{ t.read(r) } -> std::same_as<ReadResponseValue>;
{ t.apply(w) } -> std::same_as<WriteResponseValue>;
};
template <typename IoImpl, typename ReplicatedState, typename WriteRequest, typename WriteResponseValue,
typename ReadRequest, typename ReadResponseValue>
requires Rsm<WriteRequest, ReadRequest, ReplicatedState, WriteResponseValue, ReadResponseValue>
class Raft {
CommonState state_;
CommonState<WriteRequest> state_;
Role role_ = Candidate{};
Io<IoImpl> io_;
std::vector<Address> peers_;
@ -175,7 +183,7 @@ class Raft {
public:
Raft(Io<IoImpl> &&io, std::vector<Address> peers, ReplicatedState &&replicated_state)
: io_(std::move(io)), peers_(peers) replicated_state_(std::move(replicated_state)) {}
: io_(std::move(io)), peers_(peers), replicated_state_(std::move(replicated_state)) {}
void Run() {
Time last_cron = io_.Now();
@ -233,12 +241,17 @@ class Raft {
while (!leader.pending_client_requests.empty()) {
const auto &front = leader.pending_client_requests.front();
if (front.log_index <= state_.committed_log_size) {
Log("responding SUCCESS to client");
WriteResponse rr{
.success = true,
.retry_leader = std::nullopt,
};
io_.Send(front.address, front.request_id, std::move(rr));
const auto &write_request = state_.log[front.log_index].second;
WriteResponseValue write_response = replicated_state_.apply(write_request);
WriteResponse<WriteResponseValue> resp;
resp.success = true;
resp.write_value = write_response;
// Log("responding SUCCESS to client");
// WriteResponse rr{
// .success = true,
// .retry_leader = std::nullopt,
// };
io_.Send(front.address, front.request_id, std::move(resp));
leader.pending_client_requests.pop_front();
} else {
break;
@ -430,7 +443,9 @@ class Raft {
/// message that has been received.
/////////////////////////////////////////////////////////////
void Handle(std::variant<AppendRequest, AppendResponse, Write, VoteRequest, VoteResponse> &&message_variant,
// Don't we need more stuff in this variant?
void Handle(std::variant<AppendRequest<WriteRequest>, AppendResponse, WriteRequest, VoteRequest, VoteResponse>
&&message_variant,
RequestId request_id, Address from_address) {
// dispatch the message to a handler based on our role,
// which can be specified in the Handle first argument,
@ -540,7 +555,8 @@ class Raft {
}
template <typename AllRoles>
std::optional<Role> Handle(AllRoles &role, AppendRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(AllRoles &role, AppendRequest<WriteRequest> &&req, RequestId request_id,
Address from_address) {
AppendResponse res{
.success = false,
.term = state_.term,
@ -658,20 +674,43 @@ class Raft {
/////////////////////////////////////////////////////////////
// Leaders are able to immediately respond to the requester (with a ReadResponseValue) applied to the ReplicatedState
std::optional<Role> Handle(Leader &leader, ReadRequest &&res, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Leader &leader, ReadRequest &&req, RequestId request_id, Address from_address) {
// TODO(tyler / gabor) implement
ReadResponse<ReadResponseValue> resp = replicated_state_.read(req);
io_.send(from_address, request_id, resp);
return std::nullopt;
}
// Candidates should respond with a failure, similar to the Candidate + WriteRequest failure below
std::optional<Role> Handle(Candidate &, ReadRequest &&req, RequestId request_id, Address from_address) {
// TODO(tyler / gabor) implement
Log("received ReadRequest - not redirecting because no Leader is known");
auto res = ReadResponse<ReadResponseValue>{};
res.success = false;
Cron();
io_.Send(from_address, request_id, res);
return std::nullopt;
}
// Leaders should respond with a redirection, similar to the Follower + WriteRequest response below
std::optional<Role> Handle(Follower &, ReadRequest &&req, RequestId request_id, Address from_address) {
std::optional<Role> Handle(Follower &follower, ReadRequest &&req, RequestId request_id, Address from_address) {
// TODO(tyler / gabor) implement
auto res = ReadResponse<ReadResponseValue>{};
res.success = false;
Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port);
res.retry_leader = follower.leader_address;
io_.Send(from_address, request_id, res);
return std::nullopt;
}
@ -681,7 +720,7 @@ class Raft {
// server will reject the clients request and supply information
// about the most recent leader it has heard from.
std::optional<Role> Handle(Follower &follower, WriteRequest &&req, RequestId request_id, Address from_address) {
auto res = WriteResponse{};
auto res = WriteResponse<WriteResponseValue>{};
res.success = false;
Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port);
@ -694,7 +733,7 @@ class Raft {
std::optional<Role> Handle(Candidate &, WriteRequest &&req, RequestId request_id, Address from_address) {
Log("received WriteRequest - not redirecting because no Leader is known");
auto res = WriteResponse{};
auto res = WriteResponse<WriteResponseValue>{};
res.success = false;

View File

@ -12,12 +12,15 @@
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <set>
#include <thread>
#include <vector>
#include "io/address.hpp"
#include "io/rsm/raft.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
using memgraph::io::Address;
using memgraph::io::Io;
@ -25,35 +28,23 @@ using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::ResponseResult;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReplicationRequest;
using memgraph::io::rsm::ReplicationResponse;
// using memgraph::io::rsm::ReplicationRequest;
// using memgraph::io::rsm::ReplicationResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
class TestState {
std::map<int, int> state_;
public:
ReadResponse read(ReadRequest request) {
// TODO(tyler / gabor) implement
}
CasResponse apply(CasRequest request) {
// TODO(tyler / gabor) implement
}
};
struct Cas {
struct CasRequest {
int key;
int old_value;
int new_value;
std::optional<int> old_value;
std::optional<int> new_value;
};
struct CasResponse {
bool success;
std::optional<int> last_value;
std::optional<Address> retry_leader;
};
struct ReadRequest {
@ -62,10 +53,67 @@ struct ReadRequest {
struct ReadResponse {
int value;
std::optional<Address> retry_leader;
};
class TestState {
std::map<int, int> state_;
public:
ReadResponse read(ReadRequest request) {
ReadResponse ret;
ret.value = state_.at(request.key);
return ret;
}
CasResponse apply(CasRequest request) {
CasResponse ret;
// TODO(gabor) remove my annoying-ass comments
// Key exist
if (state_.find(request.key) != state_.end()) {
auto &val = state_[request.key];
/*
* Delete
*/
if (!request.new_value) {
ret.last_value = val;
ret.success = true;
state_.erase(state_.find(request.key));
}
/*
* Update
*/
// Does old_value match?
if (request.old_value == val) {
ret.last_value = val;
ret.success = true;
val = request.new_value.value();
} else {
ret.last_value = val;
ret.success = false;
}
}
/*
* Create
*/
else {
ret.last_value = std::nullopt;
ret.success = true;
state_[request.key] = request.new_value.value();
}
return ret;
}
};
template <typename IoImpl>
void RunRaft(Raft<IoImpl, TestState, Cas, CasResponse, ReadRequest, ReadResponse> server) {
void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, ReadRequest, ReadResponse> server) {
server.Run();
}
@ -96,9 +144,10 @@ void RunSimulation() {
std::vector<Address> srv_3_peers = {srv_addr_1, srv_addr_2};
// TODO(tyler / gabor) supply default TestState to Raft constructor
Raft srv_1{std::move(srv_io_1), srv_1_peers};
Raft srv_2{std::move(srv_io_2), srv_2_peers};
Raft srv_3{std::move(srv_io_3), srv_3_peers};
using RaftClass = Raft<SimulatorTransport, TestState, CasRequest, CasResponse, ReadRequest, ReadResponse>;
RaftClass srv_1{std::move(srv_io_1), srv_1_peers, TestState{}};
RaftClass srv_2{std::move(srv_io_2), srv_2_peers, TestState{}};
RaftClass srv_3{std::move(srv_io_3), srv_3_peers, TestState{}};
auto srv_thread_1 = std::jthread(RunRaft<SimulatorTransport>, std::move(srv_1));
simulator.IncrementServerCountAndWaitForQuiescentState(srv_addr_1);
@ -118,25 +167,27 @@ void RunSimulation() {
while (true) {
// send request
ReplicationRequest cli_req;
cli_req.opaque_data = std::vector<uint8_t>{1, 2, 3, 4};
CasRequest cli_req;
cli_req.key = 0;
cli_req.old_value = std::nullopt;
cli_req.new_value = 12;
// TODO(tyler / gabor) replace Replication* with Cas/Read
std::cout << "client sending ReplicationRequest to Leader " << leader.last_known_port << std::endl;
ResponseFuture<ReplicationResponse> response_future =
cli_io.RequestWithTimeout<ReplicationRequest, ReplicationResponse>(leader, cli_req, 50000);
ResponseFuture<CasResponse> response_future =
cli_io.RequestWithTimeout<CasRequest, CasResponse>(leader, cli_req, 50000);
// receive response
ResponseResult<ReplicationResponse> response_result = response_future.Wait();
ResponseResult<CasResponse> response_result = response_future.Wait();
if (response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
continue;
}
ResponseEnvelope<ReplicationResponse> response_envelope = response_result.GetValue();
ReplicationResponse response = response_envelope.message;
ResponseEnvelope<CasResponse> response_envelope = response_result.GetValue();
CasResponse response = response_envelope.message;
if (response.success) {
success = true;