A variety of small code clean-ups, remove overloaded methods

This commit is contained in:
Tyler Neely 2022-11-29 15:06:01 +00:00
parent 8f08d986cb
commit f8215306e8

View File

@ -240,42 +240,70 @@ class RequestRouter : public RequestRouterInterface {
std::vector<VertexAccessor> ScanVertices(std::optional<std::string> label) override {
ExecutionState<msgs::ScanVerticesRequest> state = {};
state.label = label;
// create requests
InitializeExecutionState(state);
// begin all requests in parallel
for (auto &request : state.requests) {
auto &storage_client = GetStorageClientForShard(request.shard);
msgs::ReadRequests req = request.request;
request.async_request_token = storage_client.SendAsyncReadRequest(request.request);
}
// drive requests to completion
std::vector<msgs::ScanVerticesResponse> responses;
SendAllRequests(state);
do {
DriveReadResponses(state, responses);
} while (!state.requests.empty());
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return PostProcess(std::move(responses));
// convert responses into VertexAccessor objects to return
std::vector<VertexAccessor> accessors;
for (auto &response : responses) {
for (auto &result_row : response.results) {
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this));
}
}
return accessors;
}
std::vector<msgs::CreateVerticesResponse> CreateVertices(std::vector<msgs::NewVertex> new_vertices) override {
ExecutionState<msgs::CreateVerticesRequest> state = {};
MG_ASSERT(!new_vertices.empty());
// create requests
InitializeExecutionState(state, new_vertices);
// begin all requests in parallel
for (auto &request : state.requests) {
auto req_deep_copy = request.request;
for (auto &new_vertex : req_deep_copy.new_vertices) {
new_vertex.label_ids.erase(new_vertex.label_ids.begin());
}
auto &storage_client = GetStorageClientForShard(request.shard);
msgs::WriteRequests req = req_deep_copy;
request.async_request_token = storage_client.SendAsyncWriteRequest(req);
}
// drive requests to completion
std::vector<msgs::CreateVerticesResponse> responses;
// 1. Send the requests.
SendAllRequests(state);
// 2. Block untill all the futures are exhausted
do {
DriveWriteResponses(state, responses);
} while (!state.requests.empty());
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
// result of storage_client.SendReadRequest()).
return responses;
}
std::vector<msgs::CreateExpandResponse> CreateExpand(std::vector<msgs::NewExpand> new_edges) override {
ExecutionState<msgs::CreateExpandRequest> state = {};
MG_ASSERT(!new_edges.empty());
// create requests
InitializeExecutionState(state, new_edges);
std::vector<msgs::CreateExpandResponse> responses;
for (auto &request : state.requests) {
@ -305,16 +333,24 @@ class RequestRouter : public RequestRouterInterface {
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
// For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties
// must be fetched again with an ExpandOne(Edges.dst)
// create requests
InitializeExecutionState(state, std::move(request));
// begin all requests in parallel
for (auto &request : state.requests) {
auto &storage_client = GetStorageClientForShard(request.shard);
msgs::ReadRequests req = request.request;
request.async_request_token = storage_client.SendAsyncReadRequest(req);
}
// drive requests to completion
std::vector<msgs::ExpandOneResponse> responses;
// 1. Send the requests.
SendAllRequests(state);
// 2. Block untill all the futures are exhausted
do {
DriveReadResponses(state, responses);
} while (!state.requests.empty());
// post-process responses
std::vector<msgs::ExpandOneResultRow> result_rows;
const auto total_row_count = std::accumulate(responses.begin(), responses.end(), 0,
[](const int64_t partial_count, const msgs::ExpandOneResponse &resp) {
@ -326,6 +362,7 @@ class RequestRouter : public RequestRouterInterface {
result_rows.insert(result_rows.end(), std::make_move_iterator(response.result.begin()),
std::make_move_iterator(response.result.end()));
}
return result_rows;
}
@ -342,16 +379,6 @@ class RequestRouter : public RequestRouterInterface {
}
private:
std::vector<VertexAccessor> PostProcess(std::vector<msgs::ScanVerticesResponse> &&responses) const {
std::vector<VertexAccessor> accessors;
for (auto &response : responses) {
for (auto &result_row : response.results) {
accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this));
}
}
return accessors;
}
void InitializeExecutionState(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::NewVertex> new_vertices) {
state.transaction_id = transaction_id_;
@ -359,7 +386,7 @@ class RequestRouter : public RequestRouterInterface {
std::map<Shard, msgs::CreateVerticesRequest> per_shard_request_table;
for (auto &new_vertex : new_vertices) {
MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!");
MG_ASSERT(!new_vertex.label_ids.empty(), "No label_ids provided for new vertex in RequestRouter::CreateVertices");
auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id,
storage::conversions::ConvertPropertyVector(new_vertex.primary_key));
if (!per_shard_request_table.contains(shard)) {
@ -499,40 +526,6 @@ class RequestRouter : public RequestRouterInterface {
storage_cli_manager_.AddClient(target_shard, std::move(cli));
}
void SendAllRequests(ExecutionState<msgs::ScanVerticesRequest> &state) {
for (auto &request : state.requests) {
const auto &current_shard = request.shard;
auto &storage_client = GetStorageClientForShard(current_shard);
msgs::ReadRequests req = request.request;
request.async_request_token = storage_client.SendAsyncReadRequest(request.request);
}
}
void SendAllRequests(ExecutionState<msgs::CreateVerticesRequest> &state) {
for (auto &request : state.requests) {
auto req_deep_copy = request.request;
for (auto &new_vertex : req_deep_copy.new_vertices) {
new_vertex.label_ids.erase(new_vertex.label_ids.begin());
}
auto &storage_client = GetStorageClientForShard(request.shard);
msgs::WriteRequests req = req_deep_copy;
request.async_request_token = storage_client.SendAsyncWriteRequest(req);
}
}
void SendAllRequests(ExecutionState<msgs::ExpandOneRequest> &state) {
for (auto &request : state.requests) {
auto &storage_client = GetStorageClientForShard(request.shard);
msgs::ReadRequests req = request.request;
request.async_request_token = storage_client.SendAsyncReadRequest(req);
}
}
template <typename RequestT, typename ResponseT>
void DriveReadResponses(ExecutionState<RequestT> &state, std::vector<ResponseT> &responses) {
for (auto &request : state.requests) {