From 60b71cc2c107eb63882cd3c1cc9616cf204b32ea Mon Sep 17 00:00:00 2001 From: gvolfing Date: Tue, 31 Jan 2023 17:30:31 +0100 Subject: [PATCH] Rework ScanByPrimaryKey operator - multiframe --- src/query/v2/plan/operator.cpp | 158 ++++++++++++++------------------- 1 file changed, 65 insertions(+), 93 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index ad1674d29..20425de6a 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -635,83 +635,43 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return VertexAccessor(vertex, properties, &request_router); } - // TODO (gvolfing) optinal vs empty vector for signaling failure? - bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) { - // Evaluate the expressions that hold the PrimaryKey. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, - storage::v3::View::NEW); + void MakeRequestMultiFrame(MultiFrame &multi_frame, RequestRouterInterface &request_router, + ExecutionContext &context) { + msgs::GetPropertiesRequest req; + const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; - std::vector pk; - for (auto *primary_property : primary_key_) { - pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + std::unordered_set used_vertex_ids; + + for (auto &frame : multi_frame.GetValidFramesModifier()) { + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); + + std::vector pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + auto vertex_id = std::make_pair(label, std::move(pk)); + auto [it, inserted] = used_vertex_ids.emplace(std::move(vertex_id)); + if (inserted) { + req.vertex_ids.emplace_back(*it); + } } - msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; - - msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { SCOPED_REQUEST_WAIT_PROFILE; return request_router.GetProperties(req); }); - MG_ASSERT(get_prop_result.size() <= 1); - // { - // SCOPED_REQUEST_WAIT_PROFILE; - // std::optional request_label = std::nullopt; - // if (label_.has_value()) { - // request_label = context.request_router->LabelToName(*label_); - // } - // current_batch_ = context.request_router->ScanVertices(request_label); - // } - // current_vertex_it_ = current_batch_.begin(); - // return !current_batch_.empty( + for (auto &result : get_prop_result) { + auto properties = result.props; + // TODO (gvolfing) figure out labels when relevant. + msgs::Vertex vertex = {.id = result.vertex, .labels = {}}; - if (get_prop_result.empty()) { - // return std::nullopt; - return false; + id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(vertex, properties, &request_router)); } - - auto properties = get_prop_result[0].props; - // TODO (gvolfing) figure out labels when relevant. - msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; - - current_batch_ = {VertexAccessor(vertex, properties, &request_router)}; - current_vertex_it_ = current_batch_.begin(); - return current_batch_.empty(); } - // std::vector MakeRequest(Frame &frame, RequestRouterInterface &request_router, - // ExecutionContext &context) { - // std::vector ret; - // // Evaluate the expressions that hold the PrimaryKey. - // ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, - // storage::v3::View::NEW); - - // std::vector pk; - // for (auto *primary_property : primary_key_) { - // pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); - // } - - // msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; - - // msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; - // auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { - // SCOPED_REQUEST_WAIT_PROFILE; - // return request_router.GetProperties(req); - // }); - // MG_ASSERT(get_prop_result.size() <= 1); - - // if (get_prop_result.empty()) { - // return ret; - // } - // auto properties = get_prop_result[0].props; - // // TODO (gvolfing) figure out labels when relevant. - // msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; - // auto va = VertexAccessor(vertex, properties, &request_router); - // ret.push_back(va); - // return ret; - // } - bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); @@ -730,15 +690,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return false; } - bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { - SCOPED_PROFILE_OP(op_name_); - + void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) { if (!own_multi_frame_.has_value()) { own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource())); own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); own_frames_it_ = own_frames_consumer_->begin(); } + MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size()); + } + + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + EnsureOwnMultiFrameIsGood(output_multi_frame); auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); auto populated_any = false; @@ -746,49 +710,58 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { while (true) { switch (state_) { case State::PullInput: { + id_to_accessor_mapping_.clear(); if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { state_ = State::Exhausted; return populated_any; } own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); own_frames_it_ = own_frames_consumer_->begin(); - state_ = State::FetchVertices; - break; - } - case State::FetchVertices: { + if (own_frames_it_ == own_frames_consumer_->end()) { - state_ = State::PullInput; continue; } - if (!filter_expressions_->empty() || current_batch_.empty()) { - // MakeRequest(context); - MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context); - } else { - // We can reuse the vertices as they don't depend on any value from the frames - current_vertex_it_ = current_batch_.begin(); - } + + MakeRequestMultiFrame(*own_multi_frame_, *context.request_router, context); + state_ = State::PopulateOutput; break; } case State::PopulateOutput: { if (!output_multi_frame.HasInvalidFrame()) { + if (own_frames_it_ == own_frames_consumer_->end()) { + id_to_accessor_mapping_.clear(); + } return populated_any; } - if (current_vertex_it_ == current_batch_.end()) { - own_frames_it_->MakeInvalid(); - ++own_frames_it_; - state_ = State::FetchVertices; + + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; continue; } for (auto output_frame_it = output_frames_populator.begin(); - output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end(); - ++output_frame_it) { + output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end(); + ++own_frames_it_) { auto &output_frame = *output_frame_it; - output_frame = *own_frames_it_; - output_frame[output_symbol_] = TypedValue(*current_vertex_it_); - current_vertex_it_++; - populated_any = true; + + ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context, + context.request_router, storage::v3::View::NEW); + + std::vector pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + auto vertex_id = std::make_pair(label, std::move(pk)); + + if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) { + output_frame = *own_frames_it_; + output_frame[output_symbol_] = TypedValue(it->second); + populated_any = true; + ++output_frame_it; + } } break; } @@ -805,20 +778,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { void Shutdown() override { input_cursor_->Shutdown(); } private: - enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted }; + enum class State { PullInput, PopulateOutput, Exhausted }; State state_{State::PullInput}; const Symbol output_symbol_; const UniqueCursorPtr input_cursor_; const char *op_name_; - std::vector current_batch_; - std::vector::iterator current_vertex_it_{current_batch_.begin()}; storage::v3::LabelId label_; std::optional> filter_expressions_; std::vector primary_key_; std::optional own_multi_frame_; std::optional own_frames_consumer_; ValidFramesConsumer::Iterator own_frames_it_; + std::unordered_map id_to_accessor_mapping_; }; ScanAll::ScanAll(const std::shared_ptr &input, Symbol output_symbol, storage::v3::View view)