Move AggregateCursor from LCP to CPP

Reviewers: mtomic, llugovic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2056
This commit is contained in:
Teon Banek 2019-05-15 17:09:04 +02:00
parent f9471f341d
commit 98a853a95c
2 changed files with 273 additions and 302 deletions

View File

@ -56,6 +56,16 @@
namespace query::plan {
bool TypedValueVectorEqual::operator()(
const std::vector<TypedValue> &left,
const std::vector<TypedValue> &right) const {
DCHECK(left.size() == right.size())
<< "TypedValueVector comparison should only be done over vectors "
"of the same size";
return std::equal(left.begin(), left.end(), right.begin(),
TypedValue::BoolEqual{});
}
namespace {
// Returns boolean result of evaluating filter expression. Null is treated as
@ -2356,22 +2366,12 @@ Aggregate::Aggregate(const std::shared_ptr<LogicalOperator> &input,
ACCEPT_WITH_INPUT(Aggregate)
UniqueCursorPtr Aggregate::MakeCursor(database::GraphDbAccessor *db,
utils::MemoryResource *mem) const {
return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, db, mem);
}
std::vector<Symbol> Aggregate::ModifiedSymbols(const SymbolTable &) const {
auto symbols = remember_;
for (const auto &elem : aggregations_) symbols.push_back(elem.output_sym);
return symbols;
}
Aggregate::AggregateCursor::AggregateCursor(const Aggregate &self,
database::GraphDbAccessor *db,
utils::MemoryResource *mem)
: self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {}
namespace {
/** Returns the default TypedValue for an Aggregation element.
* This value is valid both for returning when where are no inputs
@ -2394,248 +2394,302 @@ TypedValue DefaultAggregationOpValue(const Aggregate::Element &element) {
}
} // namespace
bool Aggregate::AggregateCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("Aggregate");
class AggregateCursor : public Cursor {
public:
AggregateCursor(const Aggregate &self, database::GraphDbAccessor *db,
utils::MemoryResource *mem)
: self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {}
if (!pulled_all_input_) {
ProcessAll(frame, context);
pulled_all_input_ = true;
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("Aggregate");
if (!pulled_all_input_) {
ProcessAll(&frame, &context);
pulled_all_input_ = true;
aggregation_it_ = aggregation_.begin();
// in case there is no input and no group_bys we need to return true
// just this once
if (aggregation_.empty() && self_.group_by_.empty()) {
// place default aggregation values on the frame
for (const auto &elem : self_.aggregations_)
frame[elem.output_sym] = DefaultAggregationOpValue(elem);
// place null as remember values on the frame
for (const Symbol &remember_sym : self_.remember_)
frame[remember_sym] = TypedValue::Null;
return true;
}
}
if (aggregation_it_ == aggregation_.end()) return false;
// place aggregation values on the frame
auto aggregation_values_it = aggregation_it_->second.values_.begin();
for (const auto &aggregation_elem : self_.aggregations_)
frame[aggregation_elem.output_sym] = *aggregation_values_it++;
// place remember values on the frame
auto remember_values_it = aggregation_it_->second.remember_.begin();
for (const Symbol &remember_sym : self_.remember_)
frame[remember_sym] = *remember_values_it++;
aggregation_it_++;
return true;
}
void Shutdown() override { input_cursor_->Shutdown(); }
void Reset() override {
input_cursor_->Reset();
aggregation_.clear();
aggregation_it_ = aggregation_.begin();
pulled_all_input_ = false;
}
// in case there is no input and no group_bys we need to return true
// just this once
if (aggregation_.empty() && self_.group_by_.empty()) {
// place default aggregation values on the frame
for (const auto &elem : self_.aggregations_)
frame[elem.output_sym] = DefaultAggregationOpValue(elem);
// place null as remember values on the frame
for (const Symbol &remember_sym : self_.remember_)
frame[remember_sym] = TypedValue::Null;
return true;
private:
// Data structure for a single aggregation cache.
// Does NOT include the group-by values since those are a key in the
// aggregation map. The vectors in an AggregationValue contain one element for
// each aggregation in this LogicalOp.
struct AggregationValue {
// how many input rows have been aggregated in respective values_ element so
// far
std::vector<int> counts_;
// aggregated values. Initially Null (until at least one input row with a
// valid value gets processed)
std::vector<TypedValue> values_;
// remember values.
std::vector<TypedValue> remember_;
};
const Aggregate &self_;
const UniqueCursorPtr input_cursor_;
// storage for aggregated data
// map key is the vector of group-by values
// map value is an AggregationValue struct
std::unordered_map<std::vector<TypedValue>, AggregationValue,
// use FNV collection hashing specialized for a vector of
// TypedValues
utils::FnvCollection<std::vector<TypedValue>, TypedValue,
TypedValue::Hash>,
// custom equality
TypedValueVectorEqual>
aggregation_;
// iterator over the accumulated cache
decltype(aggregation_.begin()) aggregation_it_ = aggregation_.begin();
// this LogicalOp pulls all from the input on it's first pull
// this switch tracks if this has been performed
bool pulled_all_input_{false};
/**
* Pulls from the input operator until exhausted and aggregates the
* results. If the input operator is not provided, a single call
* to ProcessOne is issued.
*
* Accumulation automatically groups the results so that `aggregation_`
* cache cardinality depends on number of
* aggregation results, and not on the number of inputs.
*/
void ProcessAll(Frame *frame, ExecutionContext *context) {
ExpressionEvaluator evaluator(frame, context->symbol_table,
context->evaluation_context,
context->db_accessor, GraphView::NEW);
while (input_cursor_->Pull(*frame, *context)) {
ProcessOne(*frame, &evaluator);
}
// calculate AVG aggregations (so far they have only been summed)
for (int pos = 0; pos < static_cast<int>(self_.aggregations_.size());
++pos) {
if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue;
for (auto &kv : aggregation_) {
AggregationValue &agg_value = kv.second;
int count = agg_value.counts_[pos];
if (count > 0)
agg_value.values_[pos] = agg_value.values_[pos] / (double)count;
}
}
}
if (aggregation_it_ == aggregation_.end()) return false;
// place aggregation values on the frame
auto aggregation_values_it = aggregation_it_->second.values_.begin();
for (const auto &aggregation_elem : self_.aggregations_)
frame[aggregation_elem.output_sym] = *aggregation_values_it++;
// place remember values on the frame
auto remember_values_it = aggregation_it_->second.remember_.begin();
for (const Symbol &remember_sym : self_.remember_)
frame[remember_sym] = *remember_values_it++;
aggregation_it_++;
return true;
}
void Aggregate::AggregateCursor::ProcessAll(Frame &frame,
ExecutionContext &context) {
ExpressionEvaluator evaluator(&frame, context.symbol_table,
context.evaluation_context, context.db_accessor,
GraphView::NEW);
while (input_cursor_->Pull(frame, context)) {
ProcessOne(frame, context.symbol_table, evaluator);
}
// calculate AVG aggregations (so far they have only been summed)
for (int pos = 0; pos < static_cast<int>(self_.aggregations_.size()); ++pos) {
if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue;
for (auto &kv : aggregation_) {
AggregationValue &agg_value = kv.second;
int count = agg_value.counts_[pos];
if (count > 0)
agg_value.values_[pos] = agg_value.values_[pos] / (double)count;
/**
* Performs a single accumulation.
*/
void ProcessOne(const Frame &frame, ExpressionEvaluator *evaluator) {
std::vector<TypedValue> group_by;
group_by.reserve(self_.group_by_.size());
for (Expression *expression : self_.group_by_) {
group_by.emplace_back(expression->Accept(*evaluator));
}
}
}
void Aggregate::AggregateCursor::ProcessOne(Frame &frame,
const SymbolTable &symbol_table,
ExpressionEvaluator &evaluator) {
std::vector<TypedValue> group_by;
group_by.reserve(self_.group_by_.size());
for (Expression *expression : self_.group_by_) {
group_by.emplace_back(expression->Accept(evaluator));
auto &agg_value = aggregation_[group_by];
EnsureInitialized(frame, &agg_value);
Update(evaluator, &agg_value);
}
AggregationValue &agg_value = aggregation_[group_by];
EnsureInitialized(frame, agg_value);
Update(frame, symbol_table, evaluator, agg_value);
}
/** Ensures the new AggregationValue has been initialized. This means
* that the value vectors are filled with an appropriate number of Nulls,
* counts are set to 0 and remember values are remembered.
*/
void EnsureInitialized(const Frame &frame,
AggregateCursor::AggregationValue *agg_value) const {
if (!agg_value->values_.empty()) return;
void Aggregate::AggregateCursor::EnsureInitialized(
Frame &frame,
Aggregate::AggregateCursor::AggregationValue &agg_value) const {
if (agg_value.values_.size() > 0) return;
for (const auto &agg_elem : self_.aggregations_)
agg_value->values_.emplace_back(DefaultAggregationOpValue(agg_elem));
agg_value->counts_.resize(self_.aggregations_.size(), 0);
for (const auto &agg_elem : self_.aggregations_)
agg_value.values_.emplace_back(DefaultAggregationOpValue(agg_elem));
agg_value.counts_.resize(self_.aggregations_.size(), 0);
for (const Symbol &remember_sym : self_.remember_)
agg_value->remember_.push_back(frame[remember_sym]);
}
for (const Symbol &remember_sym : self_.remember_)
agg_value.remember_.push_back(frame[remember_sym]);
}
/** Updates the given AggregationValue with new data. Assumes that
* the AggregationValue has been initialized */
void Update(ExpressionEvaluator *evaluator,
AggregateCursor::AggregationValue *agg_value) {
DCHECK(self_.aggregations_.size() == agg_value->values_.size())
<< "Expected as much AggregationValue.values_ as there are "
"aggregations.";
DCHECK(self_.aggregations_.size() == agg_value->counts_.size())
<< "Expected as much AggregationValue.counts_ as there are "
"aggregations.";
void Aggregate::AggregateCursor::Update(
Frame &, const SymbolTable &, ExpressionEvaluator &evaluator,
Aggregate::AggregateCursor::AggregationValue &agg_value) {
DCHECK(self_.aggregations_.size() == agg_value.values_.size())
<< "Expected as much AggregationValue.values_ as there are "
"aggregations.";
DCHECK(self_.aggregations_.size() == agg_value.counts_.size())
<< "Expected as much AggregationValue.counts_ as there are "
"aggregations.";
// we iterate over counts, values and aggregation info at the same time
auto count_it = agg_value->counts_.begin();
auto value_it = agg_value->values_.begin();
auto agg_elem_it = self_.aggregations_.begin();
for (; count_it < agg_value->counts_.end();
count_it++, value_it++, agg_elem_it++) {
// COUNT(*) is the only case where input expression is optional
// handle it here
auto input_expr_ptr = agg_elem_it->value;
if (!input_expr_ptr) {
*count_it += 1;
*value_it = *count_it;
continue;
}
// we iterate over counts, values and aggregation info at the same time
auto count_it = agg_value.counts_.begin();
auto value_it = agg_value.values_.begin();
auto agg_elem_it = self_.aggregations_.begin();
for (; count_it < agg_value.counts_.end();
count_it++, value_it++, agg_elem_it++) {
// COUNT(*) is the only case where input expression is optional
// handle it here
auto input_expr_ptr = agg_elem_it->value;
if (!input_expr_ptr) {
TypedValue input_value = input_expr_ptr->Accept(*evaluator);
// Aggregations skip Null input values.
if (input_value.IsNull()) continue;
const auto &agg_op = agg_elem_it->op;
*count_it += 1;
*value_it = *count_it;
continue;
}
if (*count_it == 1) {
// first value, nothing to aggregate. check type, set and continue.
switch (agg_op) {
case Aggregation::Op::MIN:
case Aggregation::Op::MAX:
*value_it = input_value;
EnsureOkForMinMax(input_value);
break;
case Aggregation::Op::SUM:
case Aggregation::Op::AVG:
*value_it = input_value;
EnsureOkForAvgSum(input_value);
break;
case Aggregation::Op::COUNT:
*value_it = 1;
break;
case Aggregation::Op::COLLECT_LIST:
value_it->Value<std::vector<TypedValue>>().push_back(input_value);
break;
case Aggregation::Op::COLLECT_MAP:
auto key = agg_elem_it->key->Accept(*evaluator);
if (key.type() != TypedValue::Type::String)
throw QueryRuntimeException("Map key must be a string.");
value_it->Value<std::map<std::string, TypedValue>>().emplace(
key.Value<std::string>(), input_value);
break;
}
continue;
}
TypedValue input_value = input_expr_ptr->Accept(evaluator);
// Aggregations skip Null input values.
if (input_value.IsNull()) continue;
const auto &agg_op = agg_elem_it->op;
*count_it += 1;
if (*count_it == 1) {
// first value, nothing to aggregate. check type, set and continue.
// aggregation of existing values
switch (agg_op) {
case Aggregation::Op::MIN:
case Aggregation::Op::MAX:
*value_it = input_value;
EnsureOkForMinMax(input_value);
break;
case Aggregation::Op::SUM:
case Aggregation::Op::AVG:
*value_it = input_value;
EnsureOkForAvgSum(input_value);
break;
case Aggregation::Op::COUNT:
*value_it = 1;
*value_it = *count_it;
break;
case Aggregation::Op::MIN: {
EnsureOkForMinMax(input_value);
try {
TypedValue comparison_result = input_value < *value_it;
// since we skip nulls we either have a valid comparison, or
// an exception was just thrown above
// safe to assume a bool TypedValue
if (comparison_result.Value<bool>()) *value_it = input_value;
} catch (const TypedValueException &) {
throw QueryRuntimeException("Unable to get MIN of '{}' and '{}'.",
input_value.type(), value_it->type());
}
break;
}
case Aggregation::Op::MAX: {
// all comments as for Op::Min
EnsureOkForMinMax(input_value);
try {
TypedValue comparison_result = input_value > *value_it;
if (comparison_result.Value<bool>()) *value_it = input_value;
} catch (const TypedValueException &) {
throw QueryRuntimeException("Unable to get MAX of '{}' and '{}'.",
input_value.type(), value_it->type());
}
break;
}
case Aggregation::Op::AVG:
// for averaging we sum first and divide by count once all
// the input has been processed
case Aggregation::Op::SUM:
EnsureOkForAvgSum(input_value);
*value_it = *value_it + input_value;
break;
case Aggregation::Op::COLLECT_LIST:
value_it->Value<std::vector<TypedValue>>().push_back(input_value);
break;
case Aggregation::Op::COLLECT_MAP:
auto key = agg_elem_it->key->Accept(evaluator);
auto key = agg_elem_it->key->Accept(*evaluator);
if (key.type() != TypedValue::Type::String)
throw QueryRuntimeException("Map key must be a string.");
value_it->Value<std::map<std::string, TypedValue>>().emplace(
key.Value<std::string>(), input_value);
break;
}
continue;
} // end switch over Aggregation::Op enum
} // end loop over all aggregations
}
/** Checks if the given TypedValue is legal in MIN and MAX. If not
* an appropriate exception is thrown. */
void EnsureOkForMinMax(const TypedValue &value) const {
switch (value.type()) {
case TypedValue::Type::Bool:
case TypedValue::Type::Int:
case TypedValue::Type::Double:
case TypedValue::Type::String:
return;
default:
throw QueryRuntimeException(
"Only boolean, numeric and string values are allowed in "
"MIN and MAX aggregations.");
}
// aggregation of existing values
switch (agg_op) {
case Aggregation::Op::COUNT:
*value_it = *count_it;
break;
case Aggregation::Op::MIN: {
EnsureOkForMinMax(input_value);
try {
TypedValue comparison_result = input_value < *value_it;
// since we skip nulls we either have a valid comparison, or
// an exception was just thrown above
// safe to assume a bool TypedValue
if (comparison_result.Value<bool>()) *value_it = input_value;
} catch (const TypedValueException &) {
throw QueryRuntimeException("Unable to get MIN of '{}' and '{}'.",
input_value.type(), value_it->type());
}
break;
}
case Aggregation::Op::MAX: {
// all comments as for Op::Min
EnsureOkForMinMax(input_value);
try {
TypedValue comparison_result = input_value > *value_it;
if (comparison_result.Value<bool>()) *value_it = input_value;
} catch (const TypedValueException &) {
throw QueryRuntimeException("Unable to get MAX of '{}' and '{}'.",
input_value.type(), value_it->type());
}
break;
}
case Aggregation::Op::AVG:
// for averaging we sum first and divide by count once all
// the input has been processed
case Aggregation::Op::SUM:
EnsureOkForAvgSum(input_value);
*value_it = *value_it + input_value;
break;
case Aggregation::Op::COLLECT_LIST:
value_it->Value<std::vector<TypedValue>>().push_back(input_value);
break;
case Aggregation::Op::COLLECT_MAP:
auto key = agg_elem_it->key->Accept(evaluator);
if (key.type() != TypedValue::Type::String)
throw QueryRuntimeException("Map key must be a string.");
value_it->Value<std::map<std::string, TypedValue>>().emplace(
key.Value<std::string>(), input_value);
break;
} // end switch over Aggregation::Op enum
} // end loop over all aggregations
}
void Aggregate::AggregateCursor::Shutdown() { input_cursor_->Shutdown(); }
void Aggregate::AggregateCursor::Reset() {
input_cursor_->Reset();
aggregation_.clear();
aggregation_it_ = aggregation_.begin();
pulled_all_input_ = false;
}
void Aggregate::AggregateCursor::EnsureOkForMinMax(
const TypedValue &value) const {
switch (value.type()) {
case TypedValue::Type::Bool:
case TypedValue::Type::Int:
case TypedValue::Type::Double:
case TypedValue::Type::String:
return;
default:
throw QueryRuntimeException(
"Only boolean, numeric and string values are allowed in "
"MIN and MAX aggregations.");
}
}
void Aggregate::AggregateCursor::EnsureOkForAvgSum(
const TypedValue &value) const {
switch (value.type()) {
case TypedValue::Type::Int:
case TypedValue::Type::Double:
return;
default:
throw QueryRuntimeException(
"Only numeric values allowed in SUM and AVG aggregations.");
/** Checks if the given TypedValue is legal in AVG and SUM. If not
* an appropriate exception is thrown. */
void EnsureOkForAvgSum(const TypedValue &value) const {
switch (value.type()) {
case TypedValue::Type::Int:
case TypedValue::Type::Double:
return;
default:
throw QueryRuntimeException(
"Only numeric values allowed in SUM and AVG aggregations.");
}
}
};
UniqueCursorPtr Aggregate::MakeCursor(database::GraphDbAccessor *db,
utils::MemoryResource *mem) const {
return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, db, mem);
}
bool TypedValueVectorEqual::operator()(
const std::vector<TypedValue> &left,
const std::vector<TypedValue> &right) const {
DCHECK(left.size() == right.size())
<< "TypedValueVector comparison should only be done over vectors "
"of the same size";
return std::equal(left.begin(), left.end(), right.begin(),
TypedValue::BoolEqual{});
}
Skip::Skip(const std::shared_ptr<LogicalOperator> &input,
Expression *expression)

View File

@ -1628,89 +1628,6 @@ elements are in an undefined state after aggregation.")
input_ = input;
}
cpp<#)
(:private
#>cpp
class AggregateCursor : public Cursor {
public:
AggregateCursor(const Aggregate &, database::GraphDbAccessor *,
utils::MemoryResource *);
bool Pull(Frame &, ExecutionContext &) override;
void Shutdown() override;
void Reset() override;
private:
// Data structure for a single aggregation cache.
// Does NOT include the group-by values since those
// are a key in the aggregation map.
// The vectors in an AggregationValue contain one element
// for each aggregation in this LogicalOp.
struct AggregationValue {
// how many input rows have been aggregated in respective
// values_ element so far
std::vector<int> counts_;
// aggregated values. Initially Null (until at least one
// input row with a valid value gets processed)
std::vector<TypedValue> values_;
// remember values.
std::vector<TypedValue> remember_;
};
const Aggregate &self_;
const UniqueCursorPtr input_cursor_;
// storage for aggregated data
// map key is the vector of group-by values
// map value is an AggregationValue struct
std::unordered_map<
std::vector<TypedValue>, AggregationValue,
// use FNV collection hashing specialized for a vector of TypedValues
utils::FnvCollection<std::vector<TypedValue>, TypedValue,
TypedValue::Hash>,
// custom equality
TypedValueVectorEqual>
aggregation_;
// iterator over the accumulated cache
decltype(aggregation_.begin()) aggregation_it_ = aggregation_.begin();
// this LogicalOp pulls all from the input on it's first pull
// this switch tracks if this has been performed
bool pulled_all_input_{false};
/**
* Pulls from the input operator until exhausted and aggregates the
* results. If the input operator is not provided, a single call
* to ProcessOne is issued.
*
* Accumulation automatically groups the results so that `aggregation_`
* cache cardinality depends on number of
* aggregation results, and not on the number of inputs.
*/
void ProcessAll(Frame &, ExecutionContext &);
/**
* Performs a single accumulation.
*/
void ProcessOne(Frame &frame, const SymbolTable &symbolTable,
ExpressionEvaluator &evaluator);
/** Ensures the new AggregationValue has been initialized. This means
* that the value vectors are filled with an appropriate number of Nulls,
* counts are set to 0 and remember values are remembered.
*/
void EnsureInitialized(Frame &frame, AggregationValue &agg_value) const;
/** Updates the given AggregationValue with new data. Assumes that
* the AggregationValue has been initialized */
void Update(Frame &frame, const SymbolTable &symbol_table,
ExpressionEvaluator &evaluator, AggregationValue &agg_value);
/** Checks if the given TypedValue is legal in MIN and MAX. If not
* an appropriate exception is thrown. */
void EnsureOkForMinMax(const TypedValue &value) const;
/** Checks if the given TypedValue is legal in AVG and SUM. If not
* an appropriate exception is thrown. */
void EnsureOkForAvgSum(const TypedValue &value) const;
};
cpp<#)
(:serialize (:slk))
(:clone))