From 7df3a743b997f0e9d91a3af83e355a7cafb577a9 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 29 Nov 2022 15:07:59 +0000 Subject: [PATCH] Simplify and parallelize CreateExpand --- src/query/v2/request_router.hpp | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 58caa95c0..7a0fb9d8d 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -305,24 +305,20 @@ class RequestRouter : public RequestRouterInterface { // create requests InitializeExecutionState(state, new_edges); - std::vector responses; + + // begin all requests in parallel for (auto &request : state.requests) { auto &storage_client = GetStorageClientForShard(request.shard); - msgs::WriteRequests req = request.request; - auto write_response_result = storage_client.SendWriteRequest(std::move(req)); - if (write_response_result.HasError()) { - throw std::runtime_error("CreateVertices request timedout"); - } - msgs::WriteResponses response_variant = write_response_result.GetValue(); - msgs::CreateExpandResponse mapped_response = std::get(response_variant); - - if (mapped_response.error) { - throw std::runtime_error("CreateExpand request did not succeed"); - } - responses.push_back(mapped_response); + msgs::ReadRequests req = request.request; + request.async_request_token = storage_client.SendAsyncReadRequest(req); } - // We are done with this state - state.requests.clear(); + + // drive requests to completion + std::vector responses; + do { + DriveReadResponses(state, responses); + } while (!state.requests.empty()); + return responses; }