From 57690c5390b0ba6e59ffe9410a0b95c07681239c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Tue, 17 Jan 2023 08:34:08 +0100 Subject: [PATCH] Refactor `DistributedScanAllAndFilterCursor` --- src/query/v2/plan/operator.cpp | 81 +++++++++++++++------------------- 1 file changed, 36 insertions(+), 45 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 1e84eaae5..9841791e6 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -457,14 +457,14 @@ class DistributedScanAllAndFilterCursor : public Cursor { using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) { + bool MakeRequest(ExecutionContext &context) { { SCOPED_REQUEST_WAIT_PROFILE; std::optional request_label = std::nullopt; if (label_.has_value()) { - request_label = request_router.LabelToName(*label_); + request_label = context.request_router->LabelToName(*label_); } - current_batch_ = request_router.ScanVertices(request_label); + current_batch_ = context.request_router->ScanVertices(request_label); } current_vertex_it_ = current_batch_.begin(); return !current_batch_.empty(); @@ -480,7 +480,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { if (current_vertex_it_ == current_batch_.end()) { ResetExecutionState(); - if (!input_cursor_->Pull(frame, context) || !MakeRequest(*context.request_router, context)) { + if (!input_cursor_->Pull(frame, context) || !MakeRequest(context)) { return false; } } @@ -491,66 +491,57 @@ class DistributedScanAllAndFilterCursor : public Cursor { } } - void PrepareNextFrames(ExecutionContext &context) { - auto &request_router = *context.request_router; - - input_cursor_->PullMultiple(*own_multi_frames_, context); - valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer(); - valid_frames_it_ = valid_frames_consumer_->begin(); - - MakeRequest(request_router, context); + bool PullNextFrames(ExecutionContext &context) { + input_cursor_->PullMultiple(*own_multi_frame_, context); + own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer(); + own_frames_it_ = own_frames_consumer_->begin(); + return own_multi_frame_->HasValidFrame(); } - inline bool HasNextFrame() { - return current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end(); + inline bool HasMoreResult() { + return current_vertex_it_ != current_batch_.end() && own_frames_it_ != own_frames_consumer_->end(); } - FrameWithValidity GetNextFrame(ExecutionContext &context) { - MG_ASSERT(HasNextFrame()); + bool PopulateFrame(ExecutionContext &context, FrameWithValidity &frame) { + MG_ASSERT(HasMoreResult()); - auto frame = *valid_frames_it_; + frame = *own_frames_it_; frame[output_symbol_] = TypedValue(*current_vertex_it_); ++current_vertex_it_; if (current_vertex_it_ == current_batch_.end()) { - valid_frames_it_->MakeInvalid(); - ++valid_frames_it_; + own_frames_it_->MakeInvalid(); + ++own_frames_it_; - if (valid_frames_it_ == valid_frames_consumer_->end()) { - PrepareNextFrames(context); - } else { - current_vertex_it_ = current_batch_.begin(); + current_vertex_it_ = current_batch_.begin(); + + if (own_frames_it_ == own_frames_consumer_->end()) { + return PullNextFrames(context); } }; - - return frame; + return true; } void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); - if (!own_multi_frames_.has_value()) { - own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), - kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource())); - PrepareNextFrames(context); + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe, + input_multi_frame.GetMemoryResource())); + + MakeRequest(context); + PullNextFrames(context); } - while (true) { + if (!HasMoreResult()) { + return; + } + + for (auto &frame : input_multi_frame.GetInvalidFramesPopulator()) { if (MustAbort(context)) { throw HintedAbortError(); } - - auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator(); - auto invalid_frame_it = invalid_frames_populator.begin(); - auto has_modified_at_least_one_frame = false; - - while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) { - has_modified_at_least_one_frame = true; - *invalid_frame_it = GetNextFrame(context); - ++invalid_frame_it; - } - - if (!has_modified_at_least_one_frame) { + if (!PopulateFrame(context, frame)) { return; } } @@ -577,9 +568,9 @@ class DistributedScanAllAndFilterCursor : public Cursor { std::optional label_; std::optional> property_expression_pair_; std::optional> filter_expressions_; - std::optional own_multi_frames_; - std::optional valid_frames_consumer_; - ValidFramesConsumer::Iterator valid_frames_it_; + std::optional own_multi_frame_; + std::optional own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; }; ScanAll::ScanAll(const std::shared_ptr &input, Symbol output_symbol, storage::v3::View view)