From f8215306e833110729ed113178e438673166ec60 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 29 Nov 2022 15:06:01 +0000 Subject: [PATCH] A variety of small code clean-ups, remove overloaded methods --- src/query/v2/request_router.hpp | 119 +++++++++++++++----------------- 1 file changed, 56 insertions(+), 63 deletions(-) diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 05ce6eb2c..58caa95c0 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -240,42 +240,70 @@ class RequestRouter : public RequestRouterInterface { std::vector ScanVertices(std::optional label) override { ExecutionState state = {}; state.label = label; + + // create requests InitializeExecutionState(state); + + // begin all requests in parallel + for (auto &request : state.requests) { + auto &storage_client = GetStorageClientForShard(request.shard); + msgs::ReadRequests req = request.request; + + request.async_request_token = storage_client.SendAsyncReadRequest(request.request); + } + + // drive requests to completion std::vector responses; - - SendAllRequests(state); - do { DriveReadResponses(state, responses); } while (!state.requests.empty()); - // TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return - // result of storage_client.SendReadRequest()). - return PostProcess(std::move(responses)); + // convert responses into VertexAccessor objects to return + std::vector accessors; + for (auto &response : responses) { + for (auto &result_row : response.results) { + accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this)); + } + } + + return accessors; } std::vector CreateVertices(std::vector new_vertices) override { ExecutionState state = {}; MG_ASSERT(!new_vertices.empty()); + + // create requests InitializeExecutionState(state, new_vertices); + + // begin all requests in parallel + for (auto &request : state.requests) { + auto req_deep_copy = request.request; + + for (auto &new_vertex : req_deep_copy.new_vertices) { + new_vertex.label_ids.erase(new_vertex.label_ids.begin()); + } + + auto &storage_client = GetStorageClientForShard(request.shard); + + msgs::WriteRequests req = req_deep_copy; + request.async_request_token = storage_client.SendAsyncWriteRequest(req); + } + + // drive requests to completion std::vector responses; - - // 1. Send the requests. - SendAllRequests(state); - - // 2. Block untill all the futures are exhausted do { DriveWriteResponses(state, responses); } while (!state.requests.empty()); - // TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return - // result of storage_client.SendReadRequest()). return responses; } std::vector CreateExpand(std::vector new_edges) override { ExecutionState state = {}; MG_ASSERT(!new_edges.empty()); + + // create requests InitializeExecutionState(state, new_edges); std::vector responses; for (auto &request : state.requests) { @@ -305,16 +333,24 @@ class RequestRouter : public RequestRouterInterface { // match (u:L1 { prop : 1 })-[:Friend]-(v:L1) // For each vertex U, the ExpandOne will result in . The destination vertex and its properties // must be fetched again with an ExpandOne(Edges.dst) + + // create requests InitializeExecutionState(state, std::move(request)); + + // begin all requests in parallel + for (auto &request : state.requests) { + auto &storage_client = GetStorageClientForShard(request.shard); + msgs::ReadRequests req = request.request; + request.async_request_token = storage_client.SendAsyncReadRequest(req); + } + + // drive requests to completion std::vector responses; - - // 1. Send the requests. - SendAllRequests(state); - - // 2. Block untill all the futures are exhausted do { DriveReadResponses(state, responses); } while (!state.requests.empty()); + + // post-process responses std::vector result_rows; const auto total_row_count = std::accumulate(responses.begin(), responses.end(), 0, [](const int64_t partial_count, const msgs::ExpandOneResponse &resp) { @@ -326,6 +362,7 @@ class RequestRouter : public RequestRouterInterface { result_rows.insert(result_rows.end(), std::make_move_iterator(response.result.begin()), std::make_move_iterator(response.result.end())); } + return result_rows; } @@ -342,16 +379,6 @@ class RequestRouter : public RequestRouterInterface { } private: - std::vector PostProcess(std::vector &&responses) const { - std::vector accessors; - for (auto &response : responses) { - for (auto &result_row : response.results) { - accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this)); - } - } - return accessors; - } - void InitializeExecutionState(ExecutionState &state, std::vector new_vertices) { state.transaction_id = transaction_id_; @@ -359,7 +386,7 @@ class RequestRouter : public RequestRouterInterface { std::map per_shard_request_table; for (auto &new_vertex : new_vertices) { - MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!"); + MG_ASSERT(!new_vertex.label_ids.empty(), "No label_ids provided for new vertex in RequestRouter::CreateVertices"); auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id, storage::conversions::ConvertPropertyVector(new_vertex.primary_key)); if (!per_shard_request_table.contains(shard)) { @@ -499,40 +526,6 @@ class RequestRouter : public RequestRouterInterface { storage_cli_manager_.AddClient(target_shard, std::move(cli)); } - void SendAllRequests(ExecutionState &state) { - for (auto &request : state.requests) { - const auto ¤t_shard = request.shard; - - auto &storage_client = GetStorageClientForShard(current_shard); - msgs::ReadRequests req = request.request; - - request.async_request_token = storage_client.SendAsyncReadRequest(request.request); - } - } - - void SendAllRequests(ExecutionState &state) { - for (auto &request : state.requests) { - auto req_deep_copy = request.request; - - for (auto &new_vertex : req_deep_copy.new_vertices) { - new_vertex.label_ids.erase(new_vertex.label_ids.begin()); - } - - auto &storage_client = GetStorageClientForShard(request.shard); - - msgs::WriteRequests req = req_deep_copy; - request.async_request_token = storage_client.SendAsyncWriteRequest(req); - } - } - - void SendAllRequests(ExecutionState &state) { - for (auto &request : state.requests) { - auto &storage_client = GetStorageClientForShard(request.shard); - msgs::ReadRequests req = request.request; - request.async_request_token = storage_client.SendAsyncReadRequest(req); - } - } - template void DriveReadResponses(ExecutionState &state, std::vector &responses) { for (auto &request : state.requests) {