Simplify and parallelize CreateExpand

This commit is contained in:
Tyler Neely 2022-11-29 15:07:59 +00:00
parent f8215306e8
commit 7df3a743b9

View File

@ -305,24 +305,20 @@ class RequestRouter : public RequestRouterInterface {
// create requests
InitializeExecutionState(state, new_edges);
std::vector<msgs::CreateExpandResponse> responses;
// begin all requests in parallel
for (auto &request : state.requests) {
auto &storage_client = GetStorageClientForShard(request.shard);
msgs::WriteRequests req = request.request;
auto write_response_result = storage_client.SendWriteRequest(std::move(req));
if (write_response_result.HasError()) {
throw std::runtime_error("CreateVertices request timedout");
}
msgs::WriteResponses response_variant = write_response_result.GetValue();
msgs::CreateExpandResponse mapped_response = std::get<msgs::CreateExpandResponse>(response_variant);
if (mapped_response.error) {
throw std::runtime_error("CreateExpand request did not succeed");
}
responses.push_back(mapped_response);
msgs::ReadRequests req = request.request;
request.async_request_token = storage_client.SendAsyncReadRequest(req);
}
// We are done with this state
state.requests.clear();
// drive requests to completion
std::vector<msgs::ExpandOneResponse> responses;
do {
DriveReadResponses(state, responses);
} while (!state.requests.empty());
return responses;
}