Added accessors prototype, and ExpandOne request boilerplate
This commit is contained in:
parent
a2b04db23c
commit
e7d9ab1c5f
src
@ -171,6 +171,17 @@ struct ShardMap {
|
||||
return std::prev(label_space.shards.upper_bound(key))->second;
|
||||
}
|
||||
|
||||
Shard GetShardForKey(const LabelId &label_id, const CompoundKey &key) const {
|
||||
MG_ASSERT(label_spaces.contains(label_id));
|
||||
|
||||
const auto &label_space = label_spaces.at(label_id);
|
||||
|
||||
MG_ASSERT(label_space.shards.begin()->first <= key,
|
||||
"the ShardMap must always contain a minimal key that is less than or equal to any requested key");
|
||||
|
||||
return std::prev(label_space.shards.upper_bound(key))->second;
|
||||
}
|
||||
|
||||
PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties) {
|
||||
PropertyMap ret{};
|
||||
|
||||
|
167
src/query/v2/accessors.hpp
Normal file
167
src/query/v2/accessors.hpp
Normal file
@ -0,0 +1,167 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v3/view.hpp"
|
||||
#include "utils/bound.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
TypedValue ValueToTypedValue(const Value &value) {
|
||||
switch (value.type) {
|
||||
case Value::NILL:
|
||||
return {};
|
||||
case Value::BOOL:
|
||||
return {value.bool_v};
|
||||
case Value::INT64:
|
||||
return {value.int_v};
|
||||
case Value::DOUBLE:
|
||||
return {value.double_v};
|
||||
case Value::STRING:
|
||||
return {value.string_v};
|
||||
case Value::LIST:
|
||||
return {value.list_v};
|
||||
case Value::MAP:
|
||||
return {value.map_v};
|
||||
case Value::VERTEX:
|
||||
case Value::EDGE:
|
||||
case Value::PATH:
|
||||
}
|
||||
std::runtime_error("Incorrect type in conversion");
|
||||
}
|
||||
|
||||
class VertexAccessor;
|
||||
|
||||
class EdgeAccessor final {
|
||||
public:
|
||||
EdgeAccessor(Edge *edge, std::map<std::string, Value> *props) : edge(edge), properties(props) {
|
||||
MG_ASSERT(edge != nullptr);
|
||||
MG_ASSERT(properties != nullptr);
|
||||
}
|
||||
|
||||
std::string EdgeType() const { return edge->type.name; }
|
||||
|
||||
std::map<std::string, TypedValue> Properties() const {
|
||||
std::map<std::string, TypedValue> res;
|
||||
for (const auto &[name, value] : *properties) {
|
||||
res[name] = ValueToTypedValue(value);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
TypedValue GetProperty(const std::string &prop_name) const {
|
||||
MG_ASSERT(properties->contains(prop_name));
|
||||
return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
|
||||
// bool HasSrcAccessor const { return src == nullptr; }
|
||||
// bool HasDstAccessor const { return dst == nullptr; }
|
||||
|
||||
// VertexAccessor To() const;
|
||||
// VertexAccessor From() const;
|
||||
|
||||
friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) noexcept {
|
||||
return lhs.edge == rhs.edge && lhs.properties == rhs.properties;
|
||||
}
|
||||
|
||||
friend bool operator!=(const EdgeAccessor &lhs, const EdgeAccessor &rhs) noexcept { return !(lhs == rhs); }
|
||||
|
||||
private:
|
||||
Edge *edge;
|
||||
// VertexAccessor *src {nullptr};
|
||||
// VertexAccessor *dst {nullptr};
|
||||
std::map<std::string, Value> *properties;
|
||||
};
|
||||
|
||||
class VertexAccessor final {
|
||||
public:
|
||||
VertexAccessor(Vertex *v, std::map<std::string, Value> *props) : vertex(v), properties(props) {
|
||||
MG_ASSERT(vertex != nullptr);
|
||||
MG_ASSERT(properties != nullptr);
|
||||
}
|
||||
|
||||
std::vector<Label> Labels() const { return vertex->labels; }
|
||||
|
||||
bool HasLabel(Label &label) const {
|
||||
return std::find_if(vertex->labels.begin(), vertex->labels.end(),
|
||||
[label](const auto &l) { return l.id == label.id; }) != vertex->labels.end();
|
||||
}
|
||||
|
||||
std::map<std::string, TypedValue> Properties() const {
|
||||
std::map<std::string, TypedValue> res;
|
||||
for (const auto &[name, value] : *properties) {
|
||||
res[name] = ValueToTypedValue(value);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
TypedValue GetProperty(const std::string &prop_name) const {
|
||||
MG_ASSERT(properties->contains(prop_name));
|
||||
return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
|
||||
// auto InEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types) const
|
||||
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.InEdges(view)))> {
|
||||
// auto maybe_edges = impl_.InEdges(view, edge_types);
|
||||
// if (maybe_edges.HasError()) return maybe_edges.GetError();
|
||||
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
|
||||
// }
|
||||
//
|
||||
// auto InEdges(storage::View view) const { return InEdges(view, {}); }
|
||||
//
|
||||
// auto InEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types, const VertexAccessor &dest)
|
||||
// const
|
||||
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.InEdges(view)))> {
|
||||
// auto maybe_edges = impl_.InEdges(view, edge_types, &dest.impl_);
|
||||
// if (maybe_edges.HasError()) return maybe_edges.GetError();
|
||||
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
|
||||
// }
|
||||
//
|
||||
// auto OutEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types) const
|
||||
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.OutEdges(view)))> {
|
||||
// auto maybe_edges = impl_.OutEdges(view, edge_types);
|
||||
// if (maybe_edges.HasError()) return maybe_edges.GetError();
|
||||
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
|
||||
// }
|
||||
//
|
||||
// auto OutEdges(storage::View view) const { return OutEdges(view, {}); }
|
||||
//
|
||||
// auto OutEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types,
|
||||
// const VertexAccessor &dest) const
|
||||
// -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.OutEdges(view)))> {
|
||||
// auto maybe_edges = impl_.OutEdges(view, edge_types, &dest.impl_);
|
||||
// if (maybe_edges.HasError()) return maybe_edges.GetError();
|
||||
// return iter::imap(MakeEdgeAccessor, std::move(*maybe_edges));
|
||||
// }
|
||||
|
||||
// storage::Result<size_t> InDegree(storage::View view) const { return impl_.InDegree(view); }
|
||||
//
|
||||
// storage::Result<size_t> OutDegree(storage::View view) const { return impl_.OutDegree(view); }
|
||||
//
|
||||
|
||||
friend bool operator==(const VertexAccessor lhs, const VertexAccessor &rhs) noexcept {
|
||||
return lhs.vertex == rhs.vertex && lhs.properties == rhs.properties;
|
||||
}
|
||||
|
||||
friend bool operator!=(const VertexAccessor lhs, const VertexAccessor &rhs) noexcept { return !(lhs == rhs); }
|
||||
|
||||
private:
|
||||
Vertex *vertex;
|
||||
std::map<std::string, Value> *properties;
|
||||
};
|
||||
|
||||
//inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
|
||||
|
||||
//inline VertexAccessor EdgeAccessor::From() const { return VertexAccessor(impl_.FromVertex()); }
|
@ -73,17 +73,19 @@ struct Value {
|
||||
union {
|
||||
Null null_v;
|
||||
bool bool_v;
|
||||
uint64_t int_v;
|
||||
int int_v;
|
||||
double double_v;
|
||||
// std::string string_v;
|
||||
// std::vector<Value> list_v;
|
||||
// std::map<std::string, Value> map_v;
|
||||
// Vertex vertex_v;
|
||||
// Edge edge_v;
|
||||
// Path path_v;
|
||||
std::string string_v;
|
||||
std::vector<Value> list_v;
|
||||
std::map<std::string, Value> map_v;
|
||||
Vertex vertex_v;
|
||||
Edge edge_v;
|
||||
Path path_v;
|
||||
};
|
||||
|
||||
Type type;
|
||||
|
||||
Value() : type(NILL), null_v{} {}
|
||||
};
|
||||
|
||||
struct ValuesMap {
|
||||
@ -152,6 +154,11 @@ struct GetPropertiesResponse {
|
||||
|
||||
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
|
||||
|
||||
struct VertexEdgeId {
|
||||
VertexId vertex_id;
|
||||
std::optional<EdgeId> next_id;
|
||||
};
|
||||
|
||||
struct ExpandOneRequest {
|
||||
Hlc transaction_id;
|
||||
std::vector<VertexId> src_vertices;
|
||||
|
@ -137,9 +137,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
std::vector<ScanVerticesResponse> responses;
|
||||
auto &shard_cacheref = state.shard_cache;
|
||||
size_t id = 0;
|
||||
std::cout << "ScanVerticesRequest" << std::endl;
|
||||
for (auto shard_it = shard_cacheref.begin(); shard_it != shard_cacheref.end(); ++id) {
|
||||
std::cout << "ScanVerticesResponse" << std::endl;
|
||||
auto &storage_client = GetStorageClientForShard(*state.label, state.requests[id].start_id.second);
|
||||
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture instead.
|
||||
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
|
||||
@ -173,6 +171,35 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
std::vector<CreateVerticesResponse> responses;
|
||||
auto &shard_cache_ref = state.shard_cache;
|
||||
size_t id = 0;
|
||||
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
|
||||
// This is fine because all new_vertices of each request end up on the same shard
|
||||
const Label label = state.requests[id].new_vertices[0].label_ids;
|
||||
auto primary_key = state.requests[id].new_vertices[0].primary_key;
|
||||
auto &storage_client = GetStorageClientForShard(*shard_it, label.id);
|
||||
auto write_response_result = storage_client.SendWriteRequest(state.requests[id]);
|
||||
// RETRY on timeouts?
|
||||
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
|
||||
if (write_response_result.HasError()) {
|
||||
throw std::runtime_error("Write request error");
|
||||
}
|
||||
if (write_response_result.GetValue().success == false) {
|
||||
throw std::runtime_error("Write request did not succeed");
|
||||
}
|
||||
responses.push_back(write_response_result.GetValue());
|
||||
shard_it = shard_cache_ref.erase(shard_it);
|
||||
}
|
||||
// We are done with this state
|
||||
MaybeCompleteState(state);
|
||||
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
|
||||
// result of storage_client.SendReadRequest()).
|
||||
return responses;
|
||||
}
|
||||
|
||||
std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) {
|
||||
MaybeInitializeExecutionState(state);
|
||||
std::vector<ExpandOneResponse> responses;
|
||||
auto &shard_cache_ref = state.shard_cache;
|
||||
size_t id = 0;
|
||||
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
|
||||
// This is fine because all new_vertices of each request end up on the same shard
|
||||
const Label label = state.requests[id].new_vertices[0].label_ids;
|
||||
@ -190,10 +217,6 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
responses.push_back(read_response_result.GetValue());
|
||||
shard_it = shard_cache_ref.erase(shard_it);
|
||||
}
|
||||
// We are done with this state
|
||||
MaybeCompleteState(state);
|
||||
// TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return
|
||||
// result of storage_client.SendReadRequest()).
|
||||
return responses;
|
||||
}
|
||||
|
||||
@ -263,8 +286,37 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
|
||||
}
|
||||
|
||||
// std::vector<storageclient> GetStorageClientFromShardforRange(const std::string &label, const CompoundKey &start,
|
||||
// const CompoundKey &end);
|
||||
void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state) {
|
||||
ThrowIfStateCompleted(state);
|
||||
if (ShallNotInitializeState(state)) {
|
||||
return;
|
||||
}
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
std::map<Shard, ExpandOneRequest> per_shard_request_table;
|
||||
MG_ASSERT(state.requests.size() == 1);
|
||||
const auto &top_level_rqst = *state.requests.begin();
|
||||
size_t id = 0;
|
||||
for (const auto &vertex : top_level_rqst.src_vertices) {
|
||||
// auto shard = shards_map_.GetShardForKey(vertex.first.id, vertex.second);
|
||||
// if (!per_shard_request_table.contains(shard)) {
|
||||
// // Expensive copy, fix this.
|
||||
// ExpandOneRequest expand_v_rqst = top_level_rqst;
|
||||
// expand_v_rqst.src_vertices.clear();
|
||||
// expand_v_rqst.edge_types.clear();
|
||||
// expand_v_rqst.transaction_id = transaction_id_;
|
||||
// per_shard_request_table.insert(std::pair(shard, std::move(expand_v_rqst)));
|
||||
// state.shard_cache.push_back(shard);
|
||||
}
|
||||
// per_shard_request_table[shard].src_vertices.push_back(vertex);
|
||||
// per_shard_request_table[shard].edge_types.push_back(top_level_rqst.edge_types[id]);
|
||||
// ++id;
|
||||
|
||||
for (auto &[shard, rqst] : per_shard_request_table) {
|
||||
state.requests.push_back(std::move(rqst));
|
||||
}
|
||||
state.state = ExecutionState<ExpandOneRequest>::EXECUTING;
|
||||
}
|
||||
|
||||
StorageClient &GetStorageClientForShard(Shard shard, LabelId label_id) {
|
||||
if (!storage_cli_manager_.Exists(label_id, shard)) {
|
||||
|
Loading…
Reference in New Issue
Block a user