Merge pull request #751 from memgraph/T1229-MG-implement-unwind-with-multiframe
Add implementation of `UNWIND` with `MultiFrame`
This commit is contained in:
commit
303362d41c
@ -2039,14 +2039,7 @@ class UnwindCursor : public Cursor {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
|
||||
// successful pull from input, initialize value and iterator
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
||||
storage::v3::View::OLD);
|
||||
TypedValue input_value = self_.input_expression_->Accept(evaluator);
|
||||
if (input_value.type() != TypedValue::Type::List)
|
||||
throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
|
||||
// Copy the evaluted input_value_list to our vector.
|
||||
input_value_ = input_value.ValueList();
|
||||
input_value_it_ = input_value_.begin();
|
||||
SetInputValue(frame, context);
|
||||
}
|
||||
|
||||
// if we reached the end of our list of values goto back to top
|
||||
@ -2057,6 +2050,70 @@ class UnwindCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("UnwindMF");
|
||||
|
||||
if (!own_multi_frame_.has_value()) {
|
||||
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
|
||||
kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
|
||||
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
}
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
auto populated_any = false;
|
||||
|
||||
while (true) {
|
||||
switch (state_) {
|
||||
case State::PullInput: {
|
||||
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
|
||||
state_ = State::Exhausted;
|
||||
return populated_any;
|
||||
}
|
||||
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
state_ = State::InitializeInputValue;
|
||||
break;
|
||||
}
|
||||
case State::InitializeInputValue: {
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
state_ = State::PullInput;
|
||||
continue;
|
||||
}
|
||||
SetInputValue(*own_frames_it_, context);
|
||||
state_ = State::PopulateOutput;
|
||||
break;
|
||||
}
|
||||
case State::PopulateOutput: {
|
||||
if (!output_multi_frame.HasInvalidFrame()) {
|
||||
return populated_any;
|
||||
}
|
||||
if (input_value_it_ == input_value_.end()) {
|
||||
own_frames_it_->MakeInvalid();
|
||||
++own_frames_it_;
|
||||
state_ = State::InitializeInputValue;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto output_frame_it = output_frames_populator.begin();
|
||||
output_frame_it != output_frames_populator.end() && input_value_it_ != input_value_.end();
|
||||
++output_frame_it) {
|
||||
auto &output_frame = *output_frame_it;
|
||||
output_frame = *own_frames_it_;
|
||||
output_frame[self_.output_symbol_] = std::move(*input_value_it_);
|
||||
input_value_it_++;
|
||||
populated_any = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::Exhausted: {
|
||||
return populated_any;
|
||||
}
|
||||
}
|
||||
}
|
||||
return populated_any;
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override {
|
||||
@ -2065,13 +2122,36 @@ class UnwindCursor : public Cursor {
|
||||
input_value_it_ = input_value_.end();
|
||||
}
|
||||
|
||||
void SetInputValue(Frame &frame, ExecutionContext &context) {
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
||||
storage::v3::View::OLD);
|
||||
TypedValue input_value = self_.input_expression_->Accept(evaluator);
|
||||
if (input_value.type() != TypedValue::Type::List) {
|
||||
throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
|
||||
}
|
||||
// It would be nice if we could move it, however it can be tricky to make it work because of allocators and
|
||||
// different memory resources, be careful.
|
||||
input_value_ = std::move(input_value.ValueList());
|
||||
input_value_it_ = input_value_.begin();
|
||||
}
|
||||
|
||||
private:
|
||||
using InputVector = utils::pmr::vector<TypedValue>;
|
||||
using InputIterator = InputVector::iterator;
|
||||
|
||||
const Unwind &self_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
// typed values we are unwinding and yielding
|
||||
utils::pmr::vector<TypedValue> input_value_;
|
||||
InputVector input_value_;
|
||||
// current position in input_value_
|
||||
decltype(input_value_)::iterator input_value_it_ = input_value_.end();
|
||||
InputIterator input_value_it_ = input_value_.end();
|
||||
|
||||
enum class State { PullInput, InitializeInputValue, PopulateOutput, Exhausted };
|
||||
|
||||
State state_{State::PullInput};
|
||||
std::optional<MultiFrame> own_multi_frame_;
|
||||
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator own_frames_it_;
|
||||
};
|
||||
|
||||
UniqueCursorPtr Unwind::MakeCursor(utils::MemoryResource *mem) const {
|
||||
|
Loading…
Reference in New Issue
Block a user