Reduce the number of node infos to a maximum of one

This commit is contained in:
János Benjamin Antal 2023-01-16 08:57:23 +01:00
parent e40f7f507b
commit 392f6e2b73

View File

@ -170,9 +170,8 @@ uint64_t ComputeProfilingKey(const T *obj) {
class DistributedCreateNodeCursor : public Cursor {
public:
using InputOperator = std::shared_ptr<memgraph::query::v2::plan::LogicalOperator>;
DistributedCreateNodeCursor(const InputOperator &op, utils::MemoryResource *mem,
std::vector<const NodeCreationInfo *> nodes_info)
: input_cursor_(op->MakeCursor(mem)), nodes_info_(std::move(nodes_info)) {}
DistributedCreateNodeCursor(const InputOperator &op, utils::MemoryResource *mem, const NodeCreationInfo &node_info)
: input_cursor_(op->MakeCursor(mem)), node_info_(node_info) {}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("CreateNode");
@ -206,27 +205,78 @@ class DistributedCreateNodeCursor : public Cursor {
void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) {
// TODO(kostasrim) Make this work with batching
const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]};
const auto primary_label = msgs::Label{.id = node_info_.labels[0]};
msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])};
frame[nodes_info_.front()->symbol] =
frame[node_info_.symbol] =
TypedValue(query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.request_router));
}
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) {
std::vector<msgs::NewVertex> requests;
// TODO(kostasrim) this assertion should be removed once we support multiple vertex creation
MG_ASSERT(nodes_info_.size() == 1);
msgs::PrimaryKey pk;
for (const auto &node_info : nodes_info_) {
msgs::NewVertex rqst;
MG_ASSERT(!node_info_.labels.empty(), "Cannot determine primary label");
const auto primary_label = node_info_.labels[0];
// TODO(jbajic) Fix properties not send,
// suggestion: ignore distinction between properties and primary keys
// since schema validation is done on storage side
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
storage::v3::View::NEW);
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info_.properties)) {
for (const auto &[key, value_expression] : *node_info_properties) {
TypedValue val = value_expression->Accept(evaluator);
if (context.request_router->IsPrimaryKey(primary_label, key)) {
rqst.primary_key.push_back(TypedValueToValue(val));
pk.push_back(TypedValueToValue(val));
}
}
} else {
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info_.properties)).ValueMap();
for (const auto &[key, value] : property_map) {
auto key_str = std::string(key);
auto property_id = context.request_router->NameToProperty(key_str);
if (context.request_router->IsPrimaryKey(primary_label, property_id)) {
rqst.primary_key.push_back(TypedValueToValue(value));
pk.push_back(TypedValueToValue(value));
}
}
}
// TODO(kostasrim) Copy non primary labels as well
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
src_vertex_props_.push_back(rqst.properties);
requests.push_back(std::move(rqst));
primary_keys_.push_back(std::move(pk));
return requests;
}
void PlaceNodesOnTheMultiFrame(MultiFrame &multi_frame, ExecutionContext &context) {
auto multi_frame_modifier = multi_frame.GetValidFramesModifier();
size_t i = 0;
MG_ASSERT(std::distance(multi_frame_modifier.begin(), multi_frame_modifier.end()));
for (auto &frame : multi_frame_modifier) {
const auto primary_label = msgs::Label{.id = node_info_.labels[0]};
msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[i])};
frame[node_info_.symbol] = TypedValue(
query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[i++], context.request_router));
}
}
std::vector<msgs::NewVertex> NodeCreationInfoToRequests(ExecutionContext &context, MultiFrame &multi_frame) {
std::vector<msgs::NewVertex> requests;
auto multi_frame_modifier = multi_frame.GetValidFramesModifier();
for (auto &frame : multi_frame_modifier) {
msgs::PrimaryKey pk;
msgs::NewVertex rqst;
MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
const auto primary_label = node_info->labels[0];
MG_ASSERT(!node_info_.labels.empty(), "Cannot determine primary label");
const auto primary_label = node_info_.labels[0];
// TODO(jbajic) Fix properties not send,
// suggestion: ignore distinction between properties and primary keys
// since schema validation is done on storage side
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
storage::v3::View::NEW);
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info_.properties)) {
for (const auto &[key, value_expression] : *node_info_properties) {
TypedValue val = value_expression->Accept(evaluator);
if (context.request_router->IsPrimaryKey(primary_label, key)) {
@ -235,7 +285,7 @@ class DistributedCreateNodeCursor : public Cursor {
}
}
} else {
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info->properties)).ValueMap();
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info_.properties)).ValueMap();
for (const auto &[key, value] : property_map) {
auto key_str = std::string(key);
auto property_id = context.request_router->NameToProperty(key_str);
@ -246,80 +296,19 @@ class DistributedCreateNodeCursor : public Cursor {
}
}
if (node_info->labels.empty()) {
throw QueryRuntimeException("Primary label must be defined!");
}
// TODO(kostasrim) Copy non primary labels as well
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
src_vertex_props_.push_back(rqst.properties);
requests.push_back(std::move(rqst));
}
primary_keys_.push_back(std::move(pk));
return requests;
}
void PlaceNodesOnTheMultiFrame(MultiFrame &multi_frame, ExecutionContext &context) {
auto multi_frame_modifier = multi_frame.GetValidFramesModifier();
size_t i = 0;
MG_ASSERT(std::distance(multi_frame_modifier.begin(), multi_frame_modifier.end()));
for (auto &frame : multi_frame_modifier) {
const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]};
msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[i])};
frame[nodes_info_.front()->symbol] = TypedValue(
query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[i++], context.request_router));
}
}
std::vector<msgs::NewVertex> NodeCreationInfoToRequests(ExecutionContext &context, MultiFrame &multi_frame) {
std::vector<msgs::NewVertex> requests;
auto multi_frame_modifier = multi_frame.GetValidFramesModifier();
for (auto &frame : multi_frame_modifier) {
msgs::PrimaryKey pk;
for (const auto &node_info : nodes_info_) {
msgs::NewVertex rqst;
MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
const auto primary_label = node_info->labels[0];
// TODO(jbajic) Fix properties not send,
// suggestion: ignore distinction between properties and primary keys
// since schema validation is done on storage side
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
storage::v3::View::NEW);
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
for (const auto &[key, value_expression] : *node_info_properties) {
TypedValue val = value_expression->Accept(evaluator);
if (context.request_router->IsPrimaryKey(primary_label, key)) {
rqst.primary_key.push_back(TypedValueToValue(val));
pk.push_back(TypedValueToValue(val));
}
}
} else {
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info->properties)).ValueMap();
for (const auto &[key, value] : property_map) {
auto key_str = std::string(key);
auto property_id = context.request_router->NameToProperty(key_str);
if (context.request_router->IsPrimaryKey(primary_label, property_id)) {
rqst.primary_key.push_back(TypedValueToValue(value));
pk.push_back(TypedValueToValue(value));
}
}
}
if (node_info->labels.empty()) {
throw QueryRuntimeException("Primary label must be defined!");
}
// TODO(kostasrim) Copy non primary labels as well
rqst.label_ids.push_back(msgs::Label{.id = primary_label});
src_vertex_props_.push_back(rqst.properties);
requests.push_back(std::move(rqst));
}
primary_keys_.push_back(std::move(pk));
}
return requests;
}
private:
const UniqueCursorPtr input_cursor_;
std::vector<const NodeCreationInfo *> nodes_info_;
NodeCreationInfo node_info_;
std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_;
std::vector<msgs::PrimaryKey> primary_keys_;
};
@ -364,7 +353,7 @@ ACCEPT_WITH_INPUT(CreateNode)
UniqueCursorPtr CreateNode::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::CreateNodeOperator);
return MakeUniqueCursorPtr<DistributedCreateNodeCursor>(mem, input_, mem, std::vector{&this->node_info_});
return MakeUniqueCursorPtr<DistributedCreateNodeCursor>(mem, input_, mem, this->node_info_);
}
std::vector<Symbol> CreateNode::ModifiedSymbols(const SymbolTable &table) const {