diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp index e13eb07ac..aeacda7d3 100644 --- a/src/query/v2/multiframe.hpp +++ b/src/query/v2/multiframe.hpp @@ -200,9 +200,9 @@ class ValidFramesConsumer { explicit ValidFramesConsumer(MultiFrame &multiframe); ~ValidFramesConsumer() noexcept; - ValidFramesConsumer(const ValidFramesConsumer &other) = delete; - ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete; - ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete; + ValidFramesConsumer(const ValidFramesConsumer &other) = default; + ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default; + ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = default; ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete; struct Iterator { diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 1569840c4..3e8d533c5 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -508,26 +508,39 @@ class DistributedScanAllAndFilterCursor : public Cursor { } } - void Generate(ExecutionContext &context) { + void PrepareNextFrames(ExecutionContext &context) { auto &request_router = *context.request_router; input_cursor_->PullMultiple(*own_multi_frames_, context); + valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer(); + valid_frames_it_ = valid_frames_consumer_->begin(); - if (!MakeRequest(request_router, context)) { - return; - } + MakeRequest(request_router, context); - auto valid_frames_consumer = own_multi_frames_->GetValidFramesConsumer(); + has_next_frame_ = current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end(); + } - for (auto valid_frames_it = valid_frames_consumer.begin(); valid_frames_it != valid_frames_consumer.end(); - ++valid_frames_it) { - for (auto vertex_it = current_batch_.begin(); vertex_it != current_batch_.end(); ++vertex_it) { - auto frame = *valid_frames_it; - frame[output_symbol_] = TypedValue(*vertex_it); - frames_buffer_.push(std::move(frame)); + inline bool HasNextFrame() { return has_next_frame_; } + + FrameWithValidity GetNextFrame(ExecutionContext &context) { + MG_ASSERT(HasNextFrame()); + + auto frame = *valid_frames_it_; + frame[output_symbol_] = TypedValue(*current_vertex_it_); + + ++current_vertex_it_; + if (current_vertex_it_ == current_batch_.end()) { + valid_frames_it_->MakeInvalid(); + ++valid_frames_it_; + + if (valid_frames_it_ == valid_frames_consumer_->end()) { + PrepareNextFrames(context); + } else { + current_vertex_it_ = current_batch_.begin(); } - valid_frames_it->MakeInvalid(); - } + }; + + return frame; } void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override { @@ -536,6 +549,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { if (!own_multi_frames_.has_value()) { own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource())); + PrepareNextFrames(context); } while (true) { @@ -543,19 +557,14 @@ class DistributedScanAllAndFilterCursor : public Cursor { throw HintedAbortError(); } - const auto should_generate = frames_buffer_.empty(); - if (should_generate) { - Generate(context); - } - auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator(); auto invalid_frame_it = invalid_frames_populator.begin(); auto has_modified_at_least_one_frame = false; - while (invalid_frames_populator.end() != invalid_frame_it && !frames_buffer_.empty()) { + + while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) { has_modified_at_least_one_frame = true; - *invalid_frame_it = frames_buffer_.front(); + *invalid_frame_it = GetNextFrame(context); ++invalid_frame_it; - frames_buffer_.pop(); } if (!has_modified_at_least_one_frame) { @@ -588,7 +597,10 @@ class DistributedScanAllAndFilterCursor : public Cursor { std::optional> property_expression_pair_; std::optional> filter_expressions_; std::optional own_multi_frames_; + std::optional valid_frames_consumer_; + ValidFramesConsumer::Iterator valid_frames_it_; std::queue frames_buffer_; + bool has_next_frame_; }; ScanAll::ScanAll(const std::shared_ptr &input, Symbol output_symbol, storage::v3::View view)