Use the Async methods under the hood for the blocking RsmClient::Send*Request methods for code-reuse

This commit is contained in:
Tyler Neely 2022-11-29 11:33:49 +00:00
parent a308ee501a
commit 1b77e029ca

View File

@ -101,68 +101,21 @@ class RsmClient {
~RsmClient() = default; ~RsmClient() = default;
BasicResult<TimedOut, WriteResponseT> SendWriteRequest(WriteRequestT req) { BasicResult<TimedOut, WriteResponseT> SendWriteRequest(WriteRequestT req) {
WriteRequest<WriteRequestT> client_req; auto token = SendAsyncWriteRequest(req);
client_req.operation = req; auto poll_result = AwaitAsyncWriteRequest(token);
while (!poll_result) {
const Duration overall_timeout = io_.GetDefaultTimeout(); poll_result = AwaitAsyncWriteRequest(token);
const Time before = io_.Now(); }
return poll_result.value();
do {
spdlog::debug("client sending WriteRequest to Leader {}", leader_.ToString());
ResponseFuture<WriteResponse<WriteResponseT>> response_future =
io_.template Request<WriteRequest<WriteRequestT>, WriteResponse<WriteResponseT>>(leader_, client_req);
ResponseResult<WriteResponse<WriteResponseT>> response_result = std::move(response_future).Wait();
if (response_result.HasError()) {
spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString());
return response_result.GetError();
}
ResponseEnvelope<WriteResponse<WriteResponseT>> &&response_envelope = std::move(response_result.GetValue());
WriteResponse<WriteResponseT> &&write_response = std::move(response_envelope.message);
if (write_response.success) {
return std::move(write_response.write_return);
}
PossiblyRedirectLeader(write_response);
} while (io_.Now() < before + overall_timeout);
return TimedOut{};
} }
BasicResult<TimedOut, ReadResponseT> SendReadRequest(ReadRequestT req) { BasicResult<TimedOut, ReadResponseT> SendReadRequest(ReadRequestT req) {
ReadRequest<ReadRequestT> read_req; auto token = SendAsyncReadRequest(req);
read_req.operation = req; auto poll_result = AwaitAsyncReadRequest(token);
while (!poll_result) {
const Duration overall_timeout = io_.GetDefaultTimeout(); poll_result = AwaitAsyncReadRequest(token);
const Time before = io_.Now(); }
return poll_result.value();
do {
spdlog::debug("client sending ReadRequest to Leader {}", leader_.ToString());
ResponseFuture<ReadResponse<ReadResponseT>> get_response_future =
io_.template Request<ReadRequest<ReadRequestT>, ReadResponse<ReadResponseT>>(leader_, read_req);
// receive response
ResponseResult<ReadResponse<ReadResponseT>> get_response_result = std::move(get_response_future).Wait();
if (get_response_result.HasError()) {
spdlog::debug("client timed out while trying to communicate with leader server {}", leader_.ToString());
return get_response_result.GetError();
}
ResponseEnvelope<ReadResponse<ReadResponseT>> &&get_response_envelope = std::move(get_response_result.GetValue());
ReadResponse<ReadResponseT> &&read_get_response = std::move(get_response_envelope.message);
if (read_get_response.success) {
return std::move(read_get_response.read_return);
}
PossiblyRedirectLeader(read_get_response);
} while (io_.Now() < before + overall_timeout);
return TimedOut{};
} }
/// AsyncRead methods /// AsyncRead methods