ExpandOne prototype

This commit is contained in:
Kostas Kyrimis 2022-09-09 12:48:04 +03:00
parent e078947d10
commit ced3b7db06
3 changed files with 49 additions and 44 deletions

View File

@ -31,6 +31,10 @@ Value EdgeAccessor::GetProperty(const std::string &prop_name) {
return properties[prop_name];
}
VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
VertexAccessor::VertexAccessor(Vertex v, std::map<std::string, Value> props)
: vertex(std::move(v)), properties(std::move(props)) {}

View File

@ -43,8 +43,8 @@ class EdgeAccessor final {
// bool HasSrcAccessor const { return src == nullptr; }
// bool HasDstAccessor const { return dst == nullptr; }
// VertexAccessor To() const;
// VertexAccessor From() const;
VertexAccessor To() const;
VertexAccessor From() const;
friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) noexcept {
return lhs.edge == rhs.edge && lhs.properties == rhs.properties;
@ -54,8 +54,6 @@ class EdgeAccessor final {
private:
Edge edge;
// VertexAccessor *src {nullptr};
// VertexAccessor *dst {nullptr};
std::map<std::string, Value> properties;
};

View File

@ -148,10 +148,10 @@ class ShardRequestManager : public ShardRequestManagerInterface {
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (read_response_result.HasError()) {
throw std::runtime_error("Read request error");
throw std::runtime_error("ScanAll request timedout");
}
if (read_response_result.GetValue().success == false) {
throw std::runtime_error("Request did not succeed");
throw std::runtime_error("ScanAll request did not succeed");
}
responses.push_back(read_response_result.GetValue());
if (!read_response_result.GetValue().next_start_id) {
@ -184,10 +184,10 @@ class ShardRequestManager : public ShardRequestManagerInterface {
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (write_response_result.HasError()) {
throw std::runtime_error("Write request error");
throw std::runtime_error("CreateVertices request timedout");
}
if (write_response_result.GetValue().success == false) {
throw std::runtime_error("Write request did not succeed");
throw std::runtime_error("CreateVertices request did not succeed");
}
responses.push_back(write_response_result.GetValue());
shard_it = shard_cache_ref.erase(shard_it);
@ -200,29 +200,31 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) {
throw std::runtime_error("Not yet implemented request");
// MaybeInitializeExecutionState(state);
// std::vector<ExpandOneResponse> responses;
// auto &shard_cache_ref = state.shard_cache;
// size_t id = 0;
// for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
// // This is fine because all new_vertices of each request end up on the same shard
// const Label label = state.requests[id].new_vertices[0].label_ids;
// auto primary_key = state.requests[id].new_vertices[0].primary_key;
// auto &storage_client = GetStorageClientForShard(*shard_it, label.id);
// auto read_response_result = storage_client.SendWriteRequest(state.requests[id]);
// // RETRY on timeouts?
// // Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map
// test if (read_response_result.HasError()) {
// throw std::runtime_error("Write request error");
// }
// if (read_response_result.GetValue().success == false) {
// throw std::runtime_error("Write request did not succeed");
// }
// responses.push_back(read_response_result.GetValue());
// shard_it = shard_cache_ref.erase(shard_it);
// }
// return responses;
// TODO(kostasrim)Update to limit the batch size here
// Expansions of the destination must be handled by the caller. For example
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
// For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties
// must be fetched again with an ExpandOne(Edges.dst)
MaybeInitializeExecutionState(state);
std::vector<ExpandOneResponse> responses;
auto &shard_cache_ref = state.shard_cache;
size_t id = 0;
// pending_requests on shards
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
const Label primary_label = state.requests[id].src_vertices[0].primary_label;
auto &storage_client = GetStorageClientForShard(*shard_it, primary_label.id);
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map
if (read_response_result.HasError()) {
throw std::runtime_error("ExpandOne request timedout");
}
if (read_response_result.GetValue().success == false) {
throw std::runtime_error("ExpandOne request did not succeed");
}
responses.push_back(read_response_result.GetValue());
}
return responses;
}
private:
@ -300,22 +302,23 @@ class ShardRequestManager : public ShardRequestManagerInterface {
std::map<Shard, ExpandOneRequest> per_shard_request_table;
MG_ASSERT(state.requests.size() == 1);
const auto &top_level_rqst = *state.requests.begin();
auto top_level_rqst = std::move(*state.requests.begin());
auto top_level_rqst_template = top_level_rqst;
top_level_rqst_template.src_vertices.clear();
top_level_rqst_template.edge_types.clear();
state.requests.clear();
size_t id = 0;
for (const auto &vertex : top_level_rqst.src_vertices) {
// auto shard = shards_map_.GetShardForKey(vertex.first.id, vertex.second);
// if (!per_shard_request_table.contains(shard)) {
// // Expensive copy, fix this.
// ExpandOneRequest expand_v_rqst = top_level_rqst;
// expand_v_rqst.src_vertices.clear();
// expand_v_rqst.edge_types.clear();
// expand_v_rqst.transaction_id = transaction_id_;
// per_shard_request_table.insert(std::pair(shard, std::move(expand_v_rqst)));
// state.shard_cache.push_back(shard);
auto shard = shards_map_.GetShardForKey(vertex.primary_label.id, vertex.primary_key);
if (!per_shard_request_table.contains(shard)) {
ExpandOneRequest expand_v_rqst = top_level_rqst_template;
per_shard_request_table.insert(std::pair(shard, std::move(expand_v_rqst)));
state.shard_cache.push_back(shard);
}
per_shard_request_table[shard].src_vertices.push_back(vertex);
per_shard_request_table[shard].edge_types.push_back(top_level_rqst.edge_types[id]);
++id;
}
// per_shard_request_table[shard].src_vertices.push_back(vertex);
// per_shard_request_table[shard].edge_types.push_back(top_level_rqst.edge_types[id]);
// ++id;
for (auto &[shard, rqst] : per_shard_request_table) {
state.requests.push_back(std::move(rqst));