Complete migration of GetProperties to new request style
This commit is contained in:
parent
6efe074313
commit
1b458ebc41
@ -359,7 +359,7 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
|
|
||||||
std::vector<msgs::GetPropertiesResultRow> GetProperties(msgs::GetPropertiesRequest requests) override {
|
std::vector<msgs::GetPropertiesResultRow> GetProperties(msgs::GetPropertiesRequest requests) override {
|
||||||
// create requests
|
// create requests
|
||||||
std::vector<msgs::GetPropertiesRequest> unsent_requests = RequestsForGetProperties(requests);
|
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> unsent_requests = RequestsForGetProperties(requests);
|
||||||
|
|
||||||
// begin all requests in parallel
|
// begin all requests in parallel
|
||||||
RunningRequests<msgs::GetPropertiesRequest> running_requests = {};
|
RunningRequests<msgs::GetPropertiesRequest> running_requests = {};
|
||||||
@ -374,7 +374,16 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// drive requests to completion
|
// drive requests to completion
|
||||||
return DriveReadResponses<msgs::GetPropertiesRequest, msgs::GetPropertiesResponse>(running_requests);
|
auto responses = DriveReadResponses<msgs::GetPropertiesRequest, msgs::GetPropertiesResponse>(running_requests);
|
||||||
|
|
||||||
|
// post-process responses
|
||||||
|
std::vector<msgs::GetPropertiesResultRow> result_rows;
|
||||||
|
|
||||||
|
for (auto &&response : responses) {
|
||||||
|
std::move(response.result_row.begin(), response.result_row.end(), std::back_inserter(result_rows));
|
||||||
|
}
|
||||||
|
|
||||||
|
return result_rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<storage::v3::PropertyId> MaybeNameToProperty(const std::string &name) const override {
|
std::optional<storage::v3::PropertyId> MaybeNameToProperty(const std::string &name) const override {
|
||||||
@ -520,15 +529,14 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
return requests;
|
return requests;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<msgs::GetPropertiesRequest> RequestsForGetProperties(const msgs::GetPropertiesRequest &request) {
|
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> RequestsForGetProperties(
|
||||||
|
const msgs::GetPropertiesRequest &request) {
|
||||||
std::map<Shard, msgs::GetPropertiesRequest> per_shard_request_table;
|
std::map<Shard, msgs::GetPropertiesRequest> per_shard_request_table;
|
||||||
auto top_level_rqst_template = request;
|
auto top_level_rqst_template = request;
|
||||||
top_level_rqst_template.transaction_id = transaction_id_;
|
top_level_rqst_template.transaction_id = transaction_id_;
|
||||||
top_level_rqst_template.vertex_ids.clear();
|
top_level_rqst_template.vertex_ids.clear();
|
||||||
top_level_rqst_template.vertices_and_edges.clear();
|
top_level_rqst_template.vertices_and_edges.clear();
|
||||||
|
|
||||||
state.transaction_id = transaction_id_;
|
|
||||||
|
|
||||||
for (auto &vertex : request.vertex_ids) {
|
for (auto &vertex : request.vertex_ids) {
|
||||||
auto shard =
|
auto shard =
|
||||||
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
|
shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second));
|
||||||
@ -547,15 +555,18 @@ class RequestRouter : public RequestRouterInterface {
|
|||||||
per_shard_request_table[shard].vertices_and_edges.emplace_back(std::move(vertex), maybe_edge);
|
per_shard_request_table[shard].vertices_and_edges.emplace_back(std::move(vertex), maybe_edge);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<ShardRequestState<msgs::GetPropertiesRequest>> requests;
|
||||||
|
|
||||||
for (auto &[shard, rqst] : per_shard_request_table) {
|
for (auto &[shard, rqst] : per_shard_request_table) {
|
||||||
ShardRequestState<msgs::GetPropertiesRequest> shard_request_state{
|
ShardRequestState<msgs::GetPropertiesRequest> shard_request_state{
|
||||||
.shard = shard,
|
.shard = shard,
|
||||||
.request = std::move(rqst),
|
.request = std::move(rqst),
|
||||||
.async_request_token = std::nullopt,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
state.requests.emplace_back(std::move(shard_request_state));
|
requests.emplace_back(std::move(shard_request_state));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return requests;
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageClient &GetStorageClientForShard(Shard shard) {
|
StorageClient &GetStorageClientForShard(Shard shard) {
|
||||||
|
Loading…
Reference in New Issue
Block a user