Unify logic between multi and single frame pull
This commit is contained in:
parent
d36c0cc424
commit
44fc2d01c7
@ -1437,18 +1437,7 @@ class AggregateCursor : public Cursor {
|
||||
ProcessOne(*frame, &evaluator);
|
||||
}
|
||||
|
||||
// calculate AVG aggregations (so far they have only been summed)
|
||||
for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) {
|
||||
if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue;
|
||||
for (auto &kv : aggregation_) {
|
||||
AggregationValue &agg_value = kv.second;
|
||||
auto count = agg_value.counts_[pos];
|
||||
auto *pull_memory = context->evaluation_context.memory;
|
||||
if (count > 0) {
|
||||
agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast<double>(count), pull_memory);
|
||||
}
|
||||
}
|
||||
}
|
||||
CalculateAverages(*context);
|
||||
}
|
||||
|
||||
void ProcessAll(MultiFrame &multi_frame, ExecutionContext *context) {
|
||||
@ -1463,18 +1452,7 @@ class AggregateCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
// calculate AVG aggregations (so far they have only been summed)
|
||||
for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) {
|
||||
if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue;
|
||||
for (auto &kv : aggregation_) {
|
||||
AggregationValue &agg_value = kv.second;
|
||||
auto count = agg_value.counts_[pos];
|
||||
auto *pull_memory = context->evaluation_context.memory;
|
||||
if (count > 0) {
|
||||
agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast<double>(count), pull_memory);
|
||||
}
|
||||
}
|
||||
}
|
||||
CalculateAverages(*context);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1492,19 +1470,18 @@ class AggregateCursor : public Cursor {
|
||||
Update(evaluator, &agg_value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a single accumulation.
|
||||
*/
|
||||
void ProcessOne(FrameWithValidity &frame, ExpressionEvaluator *evaluator) {
|
||||
auto *mem = aggregation_.get_allocator().GetMemoryResource();
|
||||
utils::pmr::vector<TypedValue> group_by(mem);
|
||||
group_by.reserve(self_.group_by_.size());
|
||||
for (Expression *expression : self_.group_by_) {
|
||||
group_by.emplace_back(expression->Accept(*evaluator));
|
||||
void CalculateAverages(ExecutionContext &context) {
|
||||
for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) {
|
||||
if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue;
|
||||
for (auto &kv : aggregation_) {
|
||||
AggregationValue &agg_value = kv.second;
|
||||
auto count = agg_value.counts_[pos];
|
||||
auto *pull_memory = context.evaluation_context.memory;
|
||||
if (count > 0) {
|
||||
agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast<double>(count), pull_memory);
|
||||
}
|
||||
}
|
||||
}
|
||||
auto &agg_value = aggregation_.try_emplace(std::move(group_by), mem).first->second;
|
||||
EnsureInitialized(frame, &agg_value);
|
||||
Update(evaluator, &agg_value);
|
||||
}
|
||||
|
||||
/** Ensures the new AggregationValue has been initialized. This means
|
||||
|
Loading…
Reference in New Issue
Block a user