Add missing glue code on DistributedCreateVertices
This commit is contained in:
parent
c182cf8384
commit
632f0398b5
@ -32,6 +32,8 @@ Value EdgeAccessor::GetProperty(const std::string &prop_name) const {
|
||||
return properties[prop_name];
|
||||
}
|
||||
|
||||
requests::Edge EdgeAccessor::GetEdge() const { return edge; }
|
||||
|
||||
VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
|
||||
|
||||
VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
|
||||
@ -69,4 +71,6 @@ Value VertexAccessor::GetProperty(const std::string &prop_name) const {
|
||||
// return ValueToTypedValue(properties[prop_name]);
|
||||
}
|
||||
|
||||
requests::Vertex VertexAccessor::GetVertex() const { return vertex; }
|
||||
|
||||
} // namespace memgraph::query::v2::accessors
|
||||
|
@ -40,6 +40,8 @@ class EdgeAccessor final {
|
||||
|
||||
Value GetProperty(const std::string &prop_name) const;
|
||||
|
||||
requests::Edge GetEdge() const;
|
||||
|
||||
// Dummy function
|
||||
inline size_t CypherId() const { return 10; }
|
||||
|
||||
@ -74,6 +76,8 @@ class VertexAccessor final {
|
||||
Value GetProperty(PropertyId prop_name) const;
|
||||
Value GetProperty(const std::string &prop_name) const;
|
||||
|
||||
requests::Vertex GetVertex() const;
|
||||
|
||||
// Dummy function
|
||||
inline size_t CypherId() const { return 10; }
|
||||
|
||||
|
@ -16,9 +16,8 @@
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
using requests::Value;
|
||||
|
||||
inline TypedValue ValueToTypedValue(const Value &value) {
|
||||
inline TypedValue ValueToTypedValue(const requests::Value &value) {
|
||||
using Value = requests::Value;
|
||||
switch (value.type) {
|
||||
case Value::NILL:
|
||||
return {};
|
||||
@ -30,10 +29,23 @@ inline TypedValue ValueToTypedValue(const Value &value) {
|
||||
return TypedValue(value.double_v);
|
||||
case Value::STRING:
|
||||
return TypedValue(value.string_v);
|
||||
case Value::LIST:
|
||||
// return TypedValue(value.list_v);
|
||||
case Value::MAP:
|
||||
// return TypedValue(value.map_v);
|
||||
case Value::LIST: {
|
||||
const auto &lst = value.list_v;
|
||||
std::vector<TypedValue> dst;
|
||||
dst.reserve(lst.size());
|
||||
for (const auto &elem : lst) {
|
||||
dst.push_back(ValueToTypedValue(elem));
|
||||
}
|
||||
return TypedValue(std::move(dst));
|
||||
}
|
||||
case Value::MAP: {
|
||||
const auto &value_map = value.map_v;
|
||||
std::map<std::string, TypedValue> dst;
|
||||
for (const auto &[key, val] : value_map) {
|
||||
dst[key] = ValueToTypedValue(val);
|
||||
}
|
||||
return TypedValue(std::move(dst));
|
||||
}
|
||||
case Value::VERTEX:
|
||||
return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
|
||||
case Value::EDGE:
|
||||
@ -44,4 +56,45 @@ inline TypedValue ValueToTypedValue(const Value &value) {
|
||||
throw std::runtime_error("Incorrect type in conversion");
|
||||
}
|
||||
|
||||
inline requests::Value TypedValueToValue(const TypedValue &value) {
|
||||
using Value = requests::Value;
|
||||
switch (value.type()) {
|
||||
case TypedValue::Type::Null:
|
||||
return {};
|
||||
case TypedValue::Type::Bool:
|
||||
return Value(value.ValueBool());
|
||||
case TypedValue::Type::Int:
|
||||
return Value(value.ValueInt());
|
||||
case TypedValue::Type::Double:
|
||||
return Value(value.ValueDouble());
|
||||
case TypedValue::Type::String:
|
||||
return Value(std::string(value.ValueString()));
|
||||
case TypedValue::Type::List: {
|
||||
const auto &lst = value.ValueList();
|
||||
std::vector<Value> dst;
|
||||
dst.reserve(lst.size());
|
||||
for (const auto &elem : lst) {
|
||||
dst.push_back(TypedValueToValue(elem));
|
||||
}
|
||||
return Value(std::move(dst));
|
||||
}
|
||||
case TypedValue::Type::Map: {
|
||||
const auto &value_map = value.ValueMap();
|
||||
std::map<std::string, Value> dst;
|
||||
for (const auto &[key, val] : value_map) {
|
||||
dst[std::string(key)] = TypedValueToValue(val);
|
||||
}
|
||||
return Value(std::move(dst));
|
||||
}
|
||||
case TypedValue::Type::Vertex:
|
||||
return Value(value.ValueVertex().GetVertex());
|
||||
case TypedValue::Type::Edge:
|
||||
return Value(value.ValueEdge().GetEdge());
|
||||
case TypedValue::Type::Path:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
throw std::runtime_error("Incorrect type in conversion");
|
||||
}
|
||||
|
||||
} // namespace memgraph::query::v2
|
||||
|
@ -4211,7 +4211,7 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
SCOPED_PROFILE_OP("CreateNode");
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
auto &shard_manager = context.shard_request_manager;
|
||||
shard_manager->Request(state_, NodeCreationInfoToRequest());
|
||||
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -4222,9 +4222,41 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
|
||||
void Reset() override { state_ = {}; }
|
||||
|
||||
std::vector<requests::NewVertexLabel> NodeCreationInfoToRequest() const {
|
||||
// TODO(kostasrim) Add the conversion
|
||||
return {};
|
||||
std::vector<requests::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) const {
|
||||
std::vector<requests::NewVertex> requests;
|
||||
for (const auto &node_info : nodes_info_) {
|
||||
requests::NewVertex rqst;
|
||||
std::map<requests::PropertyId, requests::Value> properties;
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
|
||||
storage::v3::View::NEW);
|
||||
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info.properties)) {
|
||||
for (const auto &[key, value_expression] : *node_info_properties) {
|
||||
TypedValue val = value_expression->Accept(evaluator);
|
||||
properties[key] = TypedValueToValue(val);
|
||||
if (context.shard_request_manager->IsPrimaryKey(key)) {
|
||||
rqst.primary_key.push_back(storage::v3::TypedToPropertyValue(val));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info.properties)).ValueMap();
|
||||
for (const auto &[key, value] : property_map) {
|
||||
auto key_str = std::string(key);
|
||||
auto property_id = context.shard_request_manager->NameToProperty(key_str);
|
||||
properties[property_id] = TypedValueToValue(value);
|
||||
if (context.shard_request_manager->IsPrimaryKey(property_id)) {
|
||||
rqst.primary_key.push_back(storage::v3::TypedToPropertyValue(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (node_info.labels.empty()) {
|
||||
throw QueryRuntimeException("Primary label must be defined!");
|
||||
}
|
||||
rqst.label_ids = requests::Label{node_info.labels[0]};
|
||||
// std::vector<storage::v3::LabelId> secondary_labels(node_info.labels.begin() + 1, node_info.labels.end());
|
||||
requests.push_back(std::move(rqst));
|
||||
}
|
||||
return requests;
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -272,6 +272,7 @@ class RuleBasedPlanner {
|
||||
PropertiesMapList vector_props;
|
||||
vector_props.reserve(node_properties->size());
|
||||
for (const auto &kv : *node_properties) {
|
||||
// TODO(kostasrim) GetProperty should be implemented in terms of ShardRequestManager NameToProperty
|
||||
vector_props.push_back({GetProperty(kv.first), kv.second});
|
||||
}
|
||||
return std::move(vector_props);
|
||||
|
@ -95,9 +95,12 @@ class ShardRequestManagerInterface {
|
||||
virtual void StartTransaction() = 0;
|
||||
virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0;
|
||||
virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
|
||||
std::vector<requests::NewVertexLabel> new_vertices) = 0;
|
||||
std::vector<requests::NewVertex> new_vertices) = 0;
|
||||
virtual std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) = 0;
|
||||
virtual ~ShardRequestManagerInterface() {}
|
||||
virtual memgraph::storage::v3::PropertyId NameToProperty(const std::string &name) const = 0;
|
||||
virtual memgraph::storage::v3::LabelId LabelNameToLabelId(const std::string &name) const = 0;
|
||||
virtual bool IsPrimaryKey(const PropertyId name) const = 0;
|
||||
ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete;
|
||||
ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete;
|
||||
};
|
||||
@ -144,6 +147,19 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
shards_map_ = hlc_response.fresher_shard_map.value();
|
||||
}
|
||||
|
||||
memgraph::storage::v3::PropertyId NameToProperty(const std::string &name) const override {
|
||||
return *shards_map_.GetPropertyId(name);
|
||||
}
|
||||
|
||||
memgraph::storage::v3::LabelId LabelNameToLabelId(const std::string &name) const override {
|
||||
return shards_map_.GetLabelId(name);
|
||||
}
|
||||
|
||||
bool IsPrimaryKey(const PropertyId name) const override {
|
||||
return std::find_if(shards_map_.properties.begin(), shards_map_.properties.end(),
|
||||
[name](auto &pr) { return pr.second == name; }) != shards_map_.properties.end();
|
||||
}
|
||||
|
||||
// TODO(kostasrim) Simplify return result
|
||||
std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override {
|
||||
MaybeInitializeExecutionState(state);
|
||||
@ -152,7 +168,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
size_t id = 0;
|
||||
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
|
||||
auto &storage_client = GetStorageClientForShard(*state.label, state.requests[id].start_id.primary_key);
|
||||
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture instead.
|
||||
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture
|
||||
// instead.
|
||||
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
|
||||
// RETRY on timeouts?
|
||||
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
|
||||
@ -179,7 +196,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}
|
||||
|
||||
std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
|
||||
std::vector<NewVertexLabel> new_vertices) override {
|
||||
std::vector<NewVertex> new_vertices) override {
|
||||
MG_ASSERT(!new_vertices.empty());
|
||||
MaybeInitializeExecutionState(state, new_vertices);
|
||||
std::vector<CreateVerticesResponse> responses;
|
||||
@ -266,7 +283,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}
|
||||
|
||||
void MaybeInitializeExecutionState(ExecutionState<CreateVerticesRequest> &state,
|
||||
std::vector<NewVertexLabel> new_vertices) {
|
||||
std::vector<NewVertex> new_vertices) {
|
||||
ThrowIfStateCompleted(state);
|
||||
if (ShallNotInitializeState(state)) {
|
||||
return;
|
||||
@ -276,16 +293,13 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
std::map<Shard, CreateVerticesRequest> per_shard_request_table;
|
||||
|
||||
for (auto &new_vertex : new_vertices) {
|
||||
auto shard = shards_map_.GetShardForKey(new_vertex.label, new_vertex.primary_key);
|
||||
auto shard = shards_map_.GetShardForKey(new_vertex.label_ids.id, new_vertex.primary_key);
|
||||
if (!per_shard_request_table.contains(shard)) {
|
||||
CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_};
|
||||
per_shard_request_table.insert(std::pair(shard, std::move(create_v_rqst)));
|
||||
state.shard_cache.push_back(shard);
|
||||
}
|
||||
per_shard_request_table[shard].new_vertices.push_back(
|
||||
NewVertex{.label_ids = {shards_map_.GetLabelId(new_vertex.label)},
|
||||
.primary_key = std::move(new_vertex.primary_key),
|
||||
.properties = std::move(new_vertex.properties)});
|
||||
per_shard_request_table[shard].new_vertices.push_back(std::move(new_vertex));
|
||||
}
|
||||
|
||||
for (auto &[shard, rqst] : per_shard_request_table) {
|
||||
|
@ -180,9 +180,10 @@ void TestCreateVertices(ShardRequestManager &io) {
|
||||
std::cout << "Testing Create" << std::endl;
|
||||
using PropVal = memgraph::storage::v3::PropertyValue;
|
||||
requests::ExecutionState<CreateVerticesRequest> state;
|
||||
std::vector<NewVertexLabel> new_vertices;
|
||||
NewVertexLabel a1{.label = "test_label", .primary_key = {PropVal(1), PropVal(0)}};
|
||||
NewVertexLabel a2{.label = "test_label", .primary_key = {PropVal(13), PropVal(13)}};
|
||||
std::vector<requests::NewVertex> new_vertices;
|
||||
auto label_id = io.LabelNameToLabelId("test_label");
|
||||
requests::NewVertex a1{.label_ids = label_id, .primary_key = {PropVal(1), PropVal(0)}};
|
||||
requests::NewVertex a2{.label_ids = label_id, .primary_key = {PropVal(13), PropVal(13)}};
|
||||
new_vertices.push_back(std::move(a1));
|
||||
new_vertices.push_back(std::move(a2));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user