Add abstraction for RsmClient
This commit is contained in:
parent
c7282e8935
commit
7af917e408
tests/simulation
@ -31,4 +31,4 @@ add_simulation_test(raft.cpp address)
|
||||
|
||||
add_simulation_test(trial_query_storage/query_storage_test.cpp address)
|
||||
|
||||
add_simulation_test(sharded_map.cpp address)
|
||||
#add_simulation_test(sharded_map.cpp address)
|
||||
|
@ -116,6 +116,91 @@ class TestState {
|
||||
}
|
||||
};
|
||||
|
||||
template <typename IoImpl, typename WriteRequestT, typename WriteResponseT, typename ReadRequestT,
|
||||
typename ReadResponseT>
|
||||
class RsmClient {
|
||||
using ServerPool = std::vector<Address>;
|
||||
|
||||
IoImpl io_;
|
||||
Address leader_;
|
||||
|
||||
std::mt19937 cli_rng_{0};
|
||||
ServerPool server_addrs_;
|
||||
|
||||
template <typename ResponseT>
|
||||
std::optional<ResponseT> CheckForCorrectLeader(ResponseT response) {
|
||||
if (response.retry_leader) {
|
||||
MG_ASSERT(!response.success, "retry_leader should never be set for successful responses");
|
||||
leader_ = response.retry_leader.value();
|
||||
std::cout << "client redirected to leader server " << leader_.last_known_port << std::endl;
|
||||
} else if (!response.success) {
|
||||
std::uniform_int_distribution<size_t> addr_distrib(0, (server_addrs_.size() - 1));
|
||||
size_t addr_index = addr_distrib(cli_rng_);
|
||||
leader_ = server_addrs_[addr_index];
|
||||
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
|
||||
<< " with port " << leader_.last_known_port << std::endl;
|
||||
return {};
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
public:
|
||||
RsmClient(IoImpl &&io, Address &&leader, ServerPool &&server_addrs)
|
||||
: io_{io}, leader_{leader}, server_addrs_{server_addrs} {}
|
||||
|
||||
RsmClient() = delete;
|
||||
|
||||
std::optional<WriteResponse<WriteResponseT>> SendWriteRequest(WriteRequestT req) {
|
||||
WriteRequest<WriteRequestT> client_req;
|
||||
client_req.operation = req;
|
||||
|
||||
std::cout << "client sending CasRequest to Leader " << leader_.last_known_port << std::endl;
|
||||
ResponseFuture<WriteResponse<WriteResponseT>> response_future =
|
||||
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
|
||||
ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
|
||||
|
||||
if (response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
// continue;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
ResponseEnvelope<WriteResponse<WriteResponseT>> response_envelope = response_result.GetValue();
|
||||
WriteResponse<WriteResponseT> write_response = response_envelope.message;
|
||||
|
||||
return CheckForCorrectLeader(write_response);
|
||||
}
|
||||
|
||||
std::optional<ReadResponse<ReadResponseT>> SendReadRequest(ReadRequestT req) {
|
||||
ReadRequest<ReadRequestT> read_req;
|
||||
read_req.operation = req;
|
||||
|
||||
std::cout << "client sending GetRequest to Leader " << leader_.last_known_port << std::endl;
|
||||
ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
|
||||
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
|
||||
|
||||
// receive response
|
||||
ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
|
||||
|
||||
if (get_response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
return {};
|
||||
}
|
||||
|
||||
ResponseEnvelope<ReadResponse<ReadResponseT>> get_response_envelope = get_response_result.GetValue();
|
||||
ReadResponse<ReadResponseT> read_get_response = get_response_envelope.message;
|
||||
|
||||
if (!read_get_response.success) {
|
||||
// sent to a non-leader
|
||||
return {};
|
||||
}
|
||||
|
||||
return CheckForCorrectLeader(read_get_response);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename IoImpl>
|
||||
void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetResponse> server) {
|
||||
server.Run();
|
||||
@ -147,7 +232,6 @@ void RunSimulation() {
|
||||
std::vector<Address> srv_2_peers = {srv_addr_1, srv_addr_3};
|
||||
std::vector<Address> srv_3_peers = {srv_addr_1, srv_addr_2};
|
||||
|
||||
// TODO(tyler / gabor) supply default TestState to Raft constructor
|
||||
using RaftClass = Raft<SimulatorTransport, TestState, CasRequest, CasResponse, GetRequest, GetResponse>;
|
||||
RaftClass srv_1{std::move(srv_io_1), srv_1_peers, TestState{}};
|
||||
RaftClass srv_2{std::move(srv_io_2), srv_2_peers, TestState{}};
|
||||
@ -165,16 +249,21 @@ void RunSimulation() {
|
||||
std::cout << "beginning test after servers have become quiescent" << std::endl;
|
||||
|
||||
std::mt19937 cli_rng_{0};
|
||||
Address server_addrs[]{srv_addr_1, srv_addr_2, srv_addr_3};
|
||||
std::vector<Address> server_addrs{srv_addr_1, srv_addr_2, srv_addr_3};
|
||||
Address leader = server_addrs[0];
|
||||
|
||||
RsmClient<Io<SimulatorTransport>, CasRequest, CasResponse, GetRequest, GetResponse> client(
|
||||
std::move(cli_io), std::move(leader), std::move(server_addrs));
|
||||
|
||||
const int key = 0;
|
||||
std::optional<int> last_known_value;
|
||||
|
||||
bool success = false;
|
||||
|
||||
for (int i = 0; !success; i++) {
|
||||
// send request
|
||||
/*
|
||||
* Write Request
|
||||
*/
|
||||
CasRequest cas_req;
|
||||
cas_req.key = key;
|
||||
|
||||
@ -182,37 +271,11 @@ void RunSimulation() {
|
||||
|
||||
cas_req.new_value = i;
|
||||
|
||||
WriteRequest<CasRequest> cli_req;
|
||||
cli_req.operation = cas_req;
|
||||
|
||||
std::cout << "client sending CasRequest to Leader " << leader.last_known_port << std::endl;
|
||||
ResponseFuture<WriteResponse<CasResponse>> cas_response_future =
|
||||
cli_io.Request<WriteRequest<CasRequest>, WriteResponse<CasResponse>>(leader, cli_req);
|
||||
|
||||
// receive cas_response
|
||||
ResponseResult<WriteResponse<CasResponse>> cas_response_result = std::move(cas_response_future).Wait();
|
||||
|
||||
if (cas_response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
ResponseEnvelope<WriteResponse<CasResponse>> cas_response_envelope = cas_response_result.GetValue();
|
||||
WriteResponse<CasResponse> write_cas_response = cas_response_envelope.message;
|
||||
|
||||
if (write_cas_response.retry_leader) {
|
||||
MG_ASSERT(!write_cas_response.success, "retry_leader should never be set for successful responses");
|
||||
leader = write_cas_response.retry_leader.value();
|
||||
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
|
||||
} else if (!write_cas_response.success) {
|
||||
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
|
||||
size_t addr_index = addr_distrib(cli_rng_);
|
||||
leader = server_addrs[addr_index];
|
||||
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
|
||||
<< " with port " << leader.last_known_port << std::endl;
|
||||
auto write_cas_response_opt = client.SendWriteRequest(cas_req);
|
||||
if (!write_cas_response_opt) {
|
||||
continue;
|
||||
}
|
||||
auto write_cas_response = write_cas_response_opt.value();
|
||||
|
||||
CasResponse cas_response = write_cas_response.write_return;
|
||||
|
||||
@ -228,44 +291,17 @@ void RunSimulation() {
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get Request
|
||||
*/
|
||||
GetRequest get_req;
|
||||
get_req.key = key;
|
||||
|
||||
ReadRequest<GetRequest> read_req;
|
||||
read_req.operation = get_req;
|
||||
|
||||
std::cout << "client sending GetRequest to Leader " << leader.last_known_port << std::endl;
|
||||
ResponseFuture<ReadResponse<GetResponse>> get_response_future =
|
||||
cli_io.Request<ReadRequest<GetRequest>, ReadResponse<GetResponse>>(leader, read_req);
|
||||
|
||||
// receive response
|
||||
ResponseResult<ReadResponse<GetResponse>> get_response_result = std::move(get_response_future).Wait();
|
||||
|
||||
if (get_response_result.HasError()) {
|
||||
std::cout << "client timed out while trying to communicate with leader server " << std::endl;
|
||||
auto read_get_response_opt = client.SendReadRequest(get_req);
|
||||
if (!read_get_response_opt) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ResponseEnvelope<ReadResponse<GetResponse>> get_response_envelope = get_response_result.GetValue();
|
||||
ReadResponse<GetResponse> read_get_response = get_response_envelope.message;
|
||||
|
||||
if (!read_get_response.success) {
|
||||
// sent to a non-leader
|
||||
continue;
|
||||
}
|
||||
|
||||
if (read_get_response.retry_leader) {
|
||||
MG_ASSERT(!read_get_response.success, "retry_leader should never be set for successful responses");
|
||||
leader = read_get_response.retry_leader.value();
|
||||
std::cout << "client redirected to leader server " << leader.last_known_port << std::endl;
|
||||
} else if (!read_get_response.success) {
|
||||
std::uniform_int_distribution<size_t> addr_distrib(0, 2);
|
||||
size_t addr_index = addr_distrib(cli_rng_);
|
||||
leader = server_addrs[addr_index];
|
||||
|
||||
std::cout << "client NOT redirected to leader server, trying a random one at index " << addr_index
|
||||
<< " with port " << leader.last_known_port << std::endl;
|
||||
}
|
||||
auto read_get_response = read_get_response_opt.value();
|
||||
|
||||
GetResponse get_response = read_get_response.read_return;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user