Add abstraction for RsmClient

This commit is contained in:
gvolfing 2022-08-08 13:31:28 +02:00 committed by Tyler Neely
parent d725d587ec
commit 7a84bed200
2 changed files with 102 additions and 66 deletions
tests/simulation

View File

@ -31,4 +31,4 @@ add_simulation_test(raft.cpp address)
add_simulation_test(trial_query_storage/query_storage_test.cpp address)
add_simulation_test(sharded_map.cpp address)
#add_simulation_test(sharded_map.cpp address)

View File

@ -116,6 +116,91 @@ class TestState {
}
};
template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT,
typename ReadResponseT>
class RsmClient {
using ServerPool = std::vector<Address>;
IoImpl io_;
Address leader_;
std::mt19937 cli_rng_{0};
ServerPool server_addrs_;
template <typename ResponseT>
std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) {
if (response.retry_leader) {
MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
leader_ = response.retry_leader.value();
std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl;
} else if (!response.success) {
std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
size_t addr_index = addr_distrib(cli_rng_);
leader_ = server_addrs_[addr_index];
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
<< " with port " << leader_.last_known_port << std::endl;
return {};
}
return response;
}
public:
RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs)
: io_{io}, leader_{leader}, server_addrs_{server_addrs} {}
RsmClient() = delete;
std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) {
WriteRequest<WriteRequestT> client_req;
client_req.operation = req;
std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl;
ResponseFuture<WriteResponse<WriteResponseT>> response_future =
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
if (response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
// continue;
return std::nullopt;
}
ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue();
WriteResponse<WriteResponseT> write_response = response_envelope.message;
return CheckForCorrectLeader(write_response);
}
std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) {
ReadRequest<ReadRequestT> read_req;
read_req.operation = req;
std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl;
ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
// receive response
ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
if (get_response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
return {};
}
ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue();
ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message;
if (!read_get_response.success) {
// sent to a non-leader
return {};
}
return CheckForCorrectLeader(read_get_response);
}
};
template <typename IoImpl>
void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetResponse> server) {
server.Run();
@ -147,7 +232,6 @@ void RunSimulation() {
std::vector<Address> srv_2_peers = {srv_addr_1, srv_addr_3};
std::vector<Address> srv_3_peers = {srv_addr_1, srv_addr_2};
// TODO(tyler / gabor) supply default TestState to Raft constructor
using RaftClass = Raft<SimulatorTransport, TestState, CasRequest, CasResponse, GetRequest, GetResponse>;
RaftClass srv_1{std::move(srv_io_1), srv_1_peers, TestState{}};
RaftClass srv_2{std::move(srv_io_2), srv_2_peers, TestState{}};
@ -165,16 +249,21 @@ void RunSimulation() {
std::cout << "beginning test after servers have become quiescent" << std::endl;
std::mt19937 cli_rng_{0};
Address server_addrs[]{srv_addr_1, srv_addr_2, srv_addr_3};
std::vector<Address> server_addrs{srv_addr_1, srv_addr_2, srv_addr_3};
Address leader = server_addrs[0];
RsmClient<Io<SimulatorTransport>, CasRequest, CasResponse, GetRequest, GetResponse> client(
std::move(cli_io), std::move(leader), std::move(server_addrs));
const int key = 0;
std::optional<int> last_known_value;
bool success = false;
for (int i = 0; !success; i++) {
// send request
/*
* Write Request
*/
CasRequest cas_req;
cas_req.key = key;
@ -182,37 +271,11 @@ void RunSimulation() {
cas_req.new_value = i;
WriteRequest<CasRequest> cli_req;
cli_req.operation = cas_req;
std::cout << "client sending CasRequest to Leader " << leader.last_known_port << std::endl;
ResponseFuture<WriteResponse<CasResponse>> cas_response_future =
cli_io.Request<WriteRequest<CasRequest>, WriteResponse<CasResponse>>(leader, cli_req);
// receive cas_response
ResponseResult<WriteResponse<CasResponse>> cas_response_result = std::move(cas_response_future).Wait();
if (cas_response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
continue;
}
ResponseEnvelope<WriteResponse<CasResponse>> cas_response_envelope = cas_response_result.GetValue();
WriteResponse<CasResponse> write_cas_response = cas_response_envelope.message;
if (write_cas_response.retry_leader) {
MG_ASSERT(!write_cas_response.success, "retry_leader should never be set for successful responses");
leader = write_cas_response.retry_leader.value();
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
} else if (!write_cas_response.success) {
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
size_t addr_index = addr_distrib(cli_rng_);
leader = server_addrs[addr_index];
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
<< " with port " << leader.last_known_port << std::endl;
auto write_cas_response_opt = client.SendWriteRequest(cas_req);
if (!write_cas_response_opt) {
continue;
}
auto write_cas_response = write_cas_response_opt.value();
CasResponse cas_response = write_cas_response.write_return;
@ -228,44 +291,17 @@ void RunSimulation() {
continue;
}
/*
* Get Request
*/
GetRequest get_req;
get_req.key = key;
ReadRequest<GetRequest> read_req;
read_req.operation = get_req;
std::cout << "client sending GetRequest to Leader " << leader.last_known_port << std::endl;
ResponseFuture<ReadResponse<GetResponse>> get_response_future =
cli_io.Request<ReadRequest<GetRequest>, ReadResponse<GetResponse>>(leader, read_req);
// receive response
ResponseResult<ReadResponse<GetResponse>> get_response_result = std::move(get_response_future).Wait();
if (get_response_result.HasError()) {
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
auto read_get_response_opt = client.SendReadRequest(get_req);
if (!read_get_response_opt) {
continue;
}
ResponseEnvelope<ReadResponse<GetResponse>> get_response_envelope = get_response_result.GetValue();
ReadResponse<GetResponse> read_get_response = get_response_envelope.message;
if (!read_get_response.success) {
// sent to a non-leader
continue;
}
if (read_get_response.retry_leader) {
MG_ASSERT(!read_get_response.success, "retry_leader should never be set for successful responses");
leader = read_get_response.retry_leader.value();
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
} else if (!read_get_response.success) {
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
size_t addr_index = addr_distrib(cli_rng_);
leader = server_addrs[addr_index];
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
<< " with port " << leader.last_known_port << std::endl;
}
auto read_get_response = read_get_response_opt.value();
GetResponse get_response = read_get_response.read_return;