Use MemoryResource in AggregationCursor

Summary: Micro benchmarks show some improvement, but unfortunately not much.

Reviewers: mtomic, llugovic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2058
This commit is contained in:
Teon Banek 2019-05-16 13:48:35 +02:00
parent 6c84092023
commit 0075eee58b
2 changed files with 84 additions and 24 deletions
src/query/plan
tests/benchmark/query

View File

@ -68,6 +68,16 @@ bool TypedValueVectorEqual::operator()(
namespace {
struct TypedValueVectorAllocatorEqual {
bool operator()(
const std::vector<TypedValue, utils::Allocator<TypedValue>> &left,
const std::vector<TypedValue, utils::Allocator<TypedValue>> &right)
const {
return std::equal(left.begin(), left.end(), right.begin(), right.end(),
TypedValue::BoolEqual());
}
};
// Returns boolean result of evaluating filter expression. Null is treated as
// false. Other non boolean values raise a QueryRuntimeException.
bool EvaluateFilter(ExpressionEvaluator &evaluator, Expression *filter) {
@ -2398,7 +2408,9 @@ class AggregateCursor : public Cursor {
public:
AggregateCursor(const Aggregate &self, database::GraphDbAccessor *db,
utils::MemoryResource *mem)
: self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {}
: self_(self),
input_cursor_(self_.input_->MakeCursor(db, mem)),
aggregation_(mem) {}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("Aggregate");
@ -2452,14 +2464,17 @@ class AggregateCursor : public Cursor {
// aggregation map. The vectors in an AggregationValue contain one element for
// each aggregation in this LogicalOp.
struct AggregationValue {
explicit AggregationValue(utils::MemoryResource *mem)
: counts_(mem), values_(mem), remember_(mem) {}
// how many input rows have been aggregated in respective values_ element so
// far
std::vector<int> counts_;
std::vector<int, utils::Allocator<int>> counts_;
// aggregated values. Initially Null (until at least one input row with a
// valid value gets processed)
std::vector<TypedValue> values_;
std::vector<TypedValue, utils::Allocator<TypedValue>> values_;
// remember values.
std::vector<TypedValue> remember_;
std::vector<TypedValue, utils::Allocator<TypedValue>> remember_;
};
const Aggregate &self_;
@ -2467,13 +2482,17 @@ class AggregateCursor : public Cursor {
// storage for aggregated data
// map key is the vector of group-by values
// map value is an AggregationValue struct
std::unordered_map<std::vector<TypedValue>, AggregationValue,
// use FNV collection hashing specialized for a vector of
// TypedValues
utils::FnvCollection<std::vector<TypedValue>, TypedValue,
TypedValue::Hash>,
// custom equality
TypedValueVectorEqual>
std::unordered_map<
std::vector<TypedValue, utils::Allocator<TypedValue>>, AggregationValue,
// use FNV collection hashing specialized for a vector of TypedValues
utils::FnvCollection<
std::vector<TypedValue, utils::Allocator<TypedValue>>, TypedValue,
TypedValue::Hash>,
// custom equality
TypedValueVectorAllocatorEqual,
utils::Allocator<
std::pair<const std::vector<TypedValue, utils::Allocator<TypedValue>>,
AggregationValue>>>
aggregation_;
// iterator over the accumulated cache
decltype(aggregation_.begin()) aggregation_it_ = aggregation_.begin();
@ -2515,12 +2534,14 @@ class AggregateCursor : public Cursor {
* Performs a single accumulation.
*/
void ProcessOne(const Frame &frame, ExpressionEvaluator *evaluator) {
std::vector<TypedValue> group_by;
auto *mem = aggregation_.get_allocator().GetMemoryResource();
std::vector<TypedValue, utils::Allocator<TypedValue>> group_by(mem);
group_by.reserve(self_.group_by_.size());
for (Expression *expression : self_.group_by_) {
group_by.emplace_back(expression->Accept(*evaluator));
}
auto &agg_value = aggregation_[group_by];
auto &agg_value =
aggregation_.try_emplace(std::move(group_by), mem).first->second;
EnsureInitialized(frame, &agg_value);
Update(evaluator, &agg_value);
}
@ -2690,7 +2711,6 @@ UniqueCursorPtr Aggregate::MakeCursor(database::GraphDbAccessor *db,
return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, db, mem);
}
Skip::Skip(const std::shared_ptr<LogicalOperator> &input,
Expression *expression)
: input_(input), expression_(expression) {}
@ -3144,16 +3164,6 @@ void Unwind::UnwindCursor::Reset() {
input_value_it_ = input_value_.end();
}
struct TypedValueVectorAllocatorEqual {
bool operator()(
const std::vector<TypedValue, utils::Allocator<TypedValue>> &left,
const std::vector<TypedValue, utils::Allocator<TypedValue>> &right)
const {
return std::equal(left.begin(), left.end(), right.begin(), right.end(),
TypedValue::BoolEqual());
}
};
class DistinctCursor : public Cursor {
public:
DistinctCursor(const Distinct &self, database::GraphDbAccessor *db,

View File

@ -291,4 +291,54 @@ BENCHMARK_TEMPLATE(Accumulate, MonotonicBufferResource)
->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
->Unit(benchmark::kMicrosecond);
template <class TMemory>
// NOLINTNEXTLINE(google-runtime-references)
static void Aggregate(benchmark::State &state) {
query::AstStorage ast;
query::Parameters parameters;
database::GraphDb db;
AddVertices(&db, state.range(1));
query::SymbolTable symbol_table;
auto scan_all = std::make_shared<query::plan::ScanAll>(
nullptr, symbol_table.CreateSymbol("v", false));
std::vector<query::Symbol> symbols;
symbols.reserve(state.range(0));
std::vector<query::Expression *> group_by;
group_by.reserve(state.range(0));
std::vector<query::plan::Aggregate::Element> aggregations;
aggregations.reserve(state.range(0));
for (int i = 0; i < state.range(0); ++i) {
auto sym = symbol_table.CreateSymbol(std::to_string(i), false);
symbols.push_back(sym);
group_by.push_back(ast.Create<query::Identifier>(sym.name())->MapTo(sym));
aggregations.push_back(
{ast.Create<query::PrimitiveLiteral>(i), nullptr,
query::Aggregation::Op::SUM,
symbol_table.CreateSymbol("out" + std::to_string(i), false)});
}
query::plan::Aggregate aggregate(scan_all, aggregations, group_by, symbols);
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
TMemory memory;
auto cursor = aggregate.MakeCursor(&dba, memory.get());
frame[symbols.front()] = 0; // initial group_by value
while (cursor->Pull(frame, execution_context))
frame[symbols.front()].ValueInt()++; // new group_by value
}
state.SetItemsProcessed(state.iterations());
}
BENCHMARK_TEMPLATE(Aggregate, NewDeleteResource)
->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
->Unit(benchmark::kMicrosecond);
BENCHMARK_TEMPLATE(Aggregate, MonotonicBufferResource)
->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
->Unit(benchmark::kMicrosecond);
BENCHMARK_MAIN();