diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index aa220d764..fe7a17bf7 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -812,7 +812,8 @@ std::optional PullPlan::PullMultiple(AnyStrea std::optional PullPlan::Pull(AnyStream *stream, std::optional n, const std::vector &output_symbols, std::map *summary) { - auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple + // auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple + auto should_pull_multiple = true; if (should_pull_multiple) { return PullMultiple(stream, n, output_symbols, summary); } diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 50e5d4fca..c9bd2e9d0 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -603,8 +603,6 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { filter_expressions_(filter_expressions), primary_key_(primary_key) {} - enum class State : int8_t { INITIALIZING, COMPLETED }; - using VertexAccessor = accessors::VertexAccessor; std::optional MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router, @@ -637,6 +635,83 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return VertexAccessor(vertex, properties, &request_router); } + // TODO (gvolfing) optinal vs empty vector for signaling failure? + bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) { + // Evaluate the expressions that hold the PrimaryKey. + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); + + std::vector pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + + msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; + auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { + SCOPED_REQUEST_WAIT_PROFILE; + return request_router.GetProperties(req); + }); + MG_ASSERT(get_prop_result.size() <= 1); + + // { + // SCOPED_REQUEST_WAIT_PROFILE; + // std::optional request_label = std::nullopt; + // if (label_.has_value()) { + // request_label = context.request_router->LabelToName(*label_); + // } + // current_batch_ = context.request_router->ScanVertices(request_label); + // } + // current_vertex_it_ = current_batch_.begin(); + // return !current_batch_.empty( + + if (get_prop_result.empty()) { + // return std::nullopt; + return false; + } + + auto properties = get_prop_result[0].props; + // TODO (gvolfing) figure out labels when relevant. + msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; + + current_batch_ = {VertexAccessor(vertex, properties, &request_router)}; + current_vertex_it_ = current_batch_.begin(); + return current_batch_.empty(); + } + + // std::vector MakeRequest(Frame &frame, RequestRouterInterface &request_router, + // ExecutionContext &context) { + // std::vector ret; + // // Evaluate the expressions that hold the PrimaryKey. + // ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + // storage::v3::View::NEW); + + // std::vector pk; + // for (auto *primary_property : primary_key_) { + // pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + // } + + // msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + + // msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; + // auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { + // SCOPED_REQUEST_WAIT_PROFILE; + // return request_router.GetProperties(req); + // }); + // MG_ASSERT(get_prop_result.size() <= 1); + + // if (get_prop_result.empty()) { + // return ret; + // } + // auto properties = get_prop_result[0].props; + // // TODO (gvolfing) figure out labels when relevant. + // msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; + // auto va = VertexAccessor(vertex, properties, &request_router); + // ret.push_back(va); + // return ret; + // } + bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); @@ -655,17 +730,95 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return false; } + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInput: { + if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { + state_ = State::Exhausted; + return populated_any; + } + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + state_ = State::FetchVertices; + break; + } + case State::FetchVertices: { + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; + continue; + } + if (!filter_expressions_->empty() || current_batch_.empty()) { + // MakeRequest(context); + MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context); + } else { + // We can reuse the vertices as they don't depend on any value from the frames + current_vertex_it_ = current_batch_.begin(); + } + state_ = State::PopulateOutput; + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return populated_any; + } + if (current_vertex_it_ == current_batch_.end()) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + state_ = State::FetchVertices; + continue; + } + + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end(); + ++output_frame_it) { + auto &output_frame = *output_frame_it; + output_frame = *own_frames_it_; + output_frame[output_symbol_] = TypedValue(*current_vertex_it_); + current_vertex_it_++; + populated_any = true; + } + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; + }; + void Reset() override { input_cursor_->Reset(); } void Shutdown() override { input_cursor_->Shutdown(); } private: + enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted }; + + State state_{State::PullInput}; const Symbol output_symbol_; const UniqueCursorPtr input_cursor_; const char *op_name_; + std::vector current_batch_; + std::vector::iterator current_vertex_it_{current_batch_.begin()}; storage::v3::LabelId label_; std::optional> filter_expressions_; std::vector primary_key_; + std::optional own_multi_frame_; + std::optional own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; }; ScanAll::ScanAll(const std::shared_ptr &input, Symbol output_symbol, storage::v3::View view)