Refactor AggregateCursor::ProcessAll
This commit is contained in:
parent
2ecf580ae7
commit
d36c0cc424
@ -1324,14 +1324,13 @@ class AggregateCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override {
|
||||
bool PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("AggregateMF");
|
||||
|
||||
if (!pulled_all_input_) {
|
||||
while (!pulled_all_input_) {
|
||||
ProcessAll(multi_frame, &context);
|
||||
}
|
||||
multi_frame.MakeAllFramesInvalid();
|
||||
ProcessAll(multi_frame, &context);
|
||||
pulled_all_input_ = true;
|
||||
MG_ASSERT(!multi_frame.HasValidFrame(), "ProcessAll didn't consumed all input frames!");
|
||||
aggregation_it_ = aggregation_.begin();
|
||||
|
||||
// in case there is no input and no group_bys we need to return true
|
||||
@ -1348,12 +1347,12 @@ class AggregateCursor : public Cursor {
|
||||
for (const Symbol &remember_sym : self_.remember_) {
|
||||
frame[remember_sym] = TypedValue(pull_memory);
|
||||
}
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (aggregation_it_ == aggregation_.end()) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// place aggregation values on the frame
|
||||
@ -1371,6 +1370,7 @@ class AggregateCursor : public Cursor {
|
||||
}
|
||||
|
||||
aggregation_it_++;
|
||||
return true;
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
@ -1452,19 +1452,15 @@ class AggregateCursor : public Cursor {
|
||||
}
|
||||
|
||||
void ProcessAll(MultiFrame &multi_frame, ExecutionContext *context) {
|
||||
input_cursor_->PullMultiple(multi_frame, *context);
|
||||
auto valid_frames_modifier =
|
||||
multi_frame.GetValidFramesConsumer(); // consumer is needed i.o. reader because of the evaluator
|
||||
if (valid_frames_modifier.begin() == valid_frames_modifier.end()) {
|
||||
// There are no valid frames, we stop
|
||||
pulled_all_input_ = true;
|
||||
return;
|
||||
}
|
||||
while (input_cursor_->PullMultiple(multi_frame, *context)) {
|
||||
auto valid_frames_modifier =
|
||||
multi_frame.GetValidFramesConsumer(); // consumer is needed i.o. reader because of the evaluator
|
||||
|
||||
for (auto &frame : valid_frames_modifier) {
|
||||
ExpressionEvaluator evaluator(&frame, context->symbol_table, context->evaluation_context, context->request_router,
|
||||
storage::v3::View::NEW);
|
||||
ProcessOne(frame, &evaluator);
|
||||
for (auto &frame : valid_frames_modifier) {
|
||||
ExpressionEvaluator evaluator(&frame, context->symbol_table, context->evaluation_context,
|
||||
context->request_router, storage::v3::View::NEW);
|
||||
ProcessOne(frame, &evaluator);
|
||||
}
|
||||
}
|
||||
|
||||
// calculate AVG aggregations (so far they have only been summed)
|
||||
|
Loading…
Reference in New Issue
Block a user