From 33454c7d8ee81222f0aaf2065a4af5e67998b8be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 08:43:10 +0100 Subject: [PATCH] Add implementation --- src/query/v2/plan/operator.cpp | 100 +++++++++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 10 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 50e5d4fca..190a993a9 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1972,14 +1972,7 @@ class UnwindCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) return false; // successful pull from input, initialize value and iterator - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, - storage::v3::View::OLD); - TypedValue input_value = self_.input_expression_->Accept(evaluator); - if (input_value.type() != TypedValue::Type::List) - throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); - // Copy the evaluted input_value_list to our vector. - input_value_ = input_value.ValueList(); - input_value_it_ = input_value_.begin(); + SetInputValue(frame, context); } // if we reached the end of our list of values goto back to top @@ -1990,6 +1983,70 @@ class UnwindCursor : public Cursor { } } + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP("UnwindMF"); + + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInput: { + if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { + state_ = State::Exhausted; + return populated_any; + } + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + state_ = State::InitializeInputValue; + break; + } + case State::InitializeInputValue: { + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; + continue; + } + SetInputValue(*own_frames_it_, context); + state_ = State::PopulateOutput; + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return populated_any; + } + if (input_value_it_ == input_value_.end()) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + state_ = State::InitializeInputValue; + continue; + } + + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && input_value_it_ != input_value_.end(); + ++output_frame_it) { + auto &output_frame = *output_frame_it; + output_frame = *own_frames_it_; + output_frame[self_.output_symbol_] = std::move(*input_value_it_); + input_value_it_++; + populated_any = true; + } + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; + } + void Shutdown() override { input_cursor_->Shutdown(); } void Reset() override { @@ -1998,13 +2055,36 @@ class UnwindCursor : public Cursor { input_value_it_ = input_value_.end(); } + void SetInputValue(Frame &frame, ExecutionContext &context) { + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); + TypedValue input_value = self_.input_expression_->Accept(evaluator); + if (input_value.type() != TypedValue::Type::List) { + throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); + } + // It would be nice if we could move it, however it can be tricky to make it work because of allocators and + // different memory resources, be careful. + input_value_ = std::move(input_value.ValueList()); + input_value_it_ = input_value_.begin(); + } + private: + using InputVector = utils::pmr::vector; + using InputIterator = InputVector::iterator; + const Unwind &self_; const UniqueCursorPtr input_cursor_; // typed values we are unwinding and yielding - utils::pmr::vector input_value_; + InputVector input_value_; // current position in input_value_ - decltype(input_value_)::iterator input_value_it_ = input_value_.end(); + InputIterator input_value_it_ = input_value_.end(); + + enum class State { PullInput, InitializeInputValue, PopulateOutput, Exhausted }; + + State state_{State::PullInput}; + std::optional own_multi_frame_; + std::optional own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; }; UniqueCursorPtr Unwind::MakeCursor(utils::MemoryResource *mem) const {