diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 1139c0fbd..73de9ee28 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -761,7 +761,7 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { output_frame[output_symbol_] = TypedValue(it->second); populated_any = true; ++output_frame_it; - } + } own_frames_it_->MakeInvalid(); } break; @@ -1333,28 +1333,47 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) { return a.ValueEdge() == b.ValueEdge(); } + +bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector &previous_symbols) { + // This shouldn't raise a TypedValueException, because the planner + // makes sure these are all of the expected type. In case they are not + // an error should be raised long before this code is executed. + return std::ranges::all_of(previous_symbols, + [&frame, &expand_value = frame[expand_symbol]](const auto &previous_symbol) { + const auto &previous_value = frame[previous_symbol]; + return !ContainsSameEdge(previous_value, expand_value); + }); +} + } // namespace bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("EdgeUniquenessFilter"); - - auto expansion_ok = [&]() { - const auto &expand_value = frame[self_.expand_symbol_]; - for (const auto &previous_symbol : self_.previous_symbols_) { - const auto &previous_value = frame[previous_symbol]; - // This shouldn't raise a TypedValueException, because the planner - // makes sure these are all of the expected type. In case they are not - // an error should be raised long before this code is executed. - if (ContainsSameEdge(previous_value, expand_value)) return false; - } - return true; - }; - while (input_cursor_->Pull(frame, context)) - if (expansion_ok()) return true; + if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) return true; return false; } +bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame &output_multi_frame, + ExecutionContext &context) { + SCOPED_PROFILE_OP("EdgeUniquenessFilterMF"); + auto populated_any = false; + + while (output_multi_frame.HasInvalidFrame()) { + if (!input_cursor_->PullMultiple(output_multi_frame, context)) { + return populated_any; + } + for (auto &frame : output_multi_frame.GetValidFramesConsumer()) { + if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) { + populated_any = true; + } else { + frame.MakeInvalid(); + } + } + } + return populated_any; +} + void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Shutdown() { input_cursor_->Shutdown(); } void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Reset() { input_cursor_->Reset(); } diff --git a/src/query/v2/plan/operator.lcp b/src/query/v2/plan/operator.lcp index 4f34cc061..110ba8a33 100644 --- a/src/query/v2/plan/operator.lcp +++ b/src/query/v2/plan/operator.lcp @@ -1570,6 +1570,7 @@ edge lists).") EdgeUniquenessFilterCursor(const EdgeUniquenessFilter &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; + bool PullMultiple(MultiFrame &, ExecutionContext &) override; void Shutdown() override; void Reset() override;