From 0075eee58b5a305cfe17d3ce5902eeb90ccb283a Mon Sep 17 00:00:00 2001
From: Teon Banek <teon.banek@memgraph.io>
Date: Thu, 16 May 2019 13:48:35 +0200
Subject: [PATCH] Use MemoryResource in AggregationCursor

Summary: Micro benchmarks show some improvement, but unfortunately not much.

Reviewers: mtomic, llugovic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2058
---
 src/query/plan/operator.cpp         | 58 +++++++++++++++++------------
 tests/benchmark/query/execution.cpp | 50 +++++++++++++++++++++++++
 2 files changed, 84 insertions(+), 24 deletions(-)

diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp
index b234b2b3e..4bfab2408 100644
--- a/src/query/plan/operator.cpp
+++ b/src/query/plan/operator.cpp
@@ -68,6 +68,16 @@ bool TypedValueVectorEqual::operator()(
 
 namespace {
 
+struct TypedValueVectorAllocatorEqual {
+  bool operator()(
+      const std::vector<TypedValue, utils::Allocator<TypedValue>> &left,
+      const std::vector<TypedValue, utils::Allocator<TypedValue>> &right)
+      const {
+    return std::equal(left.begin(), left.end(), right.begin(), right.end(),
+                      TypedValue::BoolEqual());
+  }
+};
+
 // Returns boolean result of evaluating filter expression. Null is treated as
 // false. Other non boolean values raise a QueryRuntimeException.
 bool EvaluateFilter(ExpressionEvaluator &evaluator, Expression *filter) {
@@ -2398,7 +2408,9 @@ class AggregateCursor : public Cursor {
  public:
   AggregateCursor(const Aggregate &self, database::GraphDbAccessor *db,
                   utils::MemoryResource *mem)
-      : self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {}
+      : self_(self),
+        input_cursor_(self_.input_->MakeCursor(db, mem)),
+        aggregation_(mem) {}
 
   bool Pull(Frame &frame, ExecutionContext &context) override {
     SCOPED_PROFILE_OP("Aggregate");
@@ -2452,14 +2464,17 @@ class AggregateCursor : public Cursor {
   // aggregation map. The vectors in an AggregationValue contain one element for
   // each aggregation in this LogicalOp.
   struct AggregationValue {
+    explicit AggregationValue(utils::MemoryResource *mem)
+        : counts_(mem), values_(mem), remember_(mem) {}
+
     // how many input rows have been aggregated in respective values_ element so
     // far
-    std::vector<int> counts_;
+    std::vector<int, utils::Allocator<int>> counts_;
     // aggregated values. Initially Null (until at least one input row with a
     // valid value gets processed)
-    std::vector<TypedValue> values_;
+    std::vector<TypedValue, utils::Allocator<TypedValue>> values_;
     // remember values.
-    std::vector<TypedValue> remember_;
+    std::vector<TypedValue, utils::Allocator<TypedValue>> remember_;
   };
 
   const Aggregate &self_;
@@ -2467,13 +2482,17 @@ class AggregateCursor : public Cursor {
   // storage for aggregated data
   // map key is the vector of group-by values
   // map value is an AggregationValue struct
-  std::unordered_map<std::vector<TypedValue>, AggregationValue,
-                     // use FNV collection hashing specialized for a vector of
-                     // TypedValues
-                     utils::FnvCollection<std::vector<TypedValue>, TypedValue,
-                                          TypedValue::Hash>,
-                     // custom equality
-                     TypedValueVectorEqual>
+  std::unordered_map<
+      std::vector<TypedValue, utils::Allocator<TypedValue>>, AggregationValue,
+      // use FNV collection hashing specialized for a vector of TypedValues
+      utils::FnvCollection<
+          std::vector<TypedValue, utils::Allocator<TypedValue>>, TypedValue,
+          TypedValue::Hash>,
+      // custom equality
+      TypedValueVectorAllocatorEqual,
+      utils::Allocator<
+          std::pair<const std::vector<TypedValue, utils::Allocator<TypedValue>>,
+                    AggregationValue>>>
       aggregation_;
   // iterator over the accumulated cache
   decltype(aggregation_.begin()) aggregation_it_ = aggregation_.begin();
@@ -2515,12 +2534,14 @@ class AggregateCursor : public Cursor {
    * Performs a single accumulation.
    */
   void ProcessOne(const Frame &frame, ExpressionEvaluator *evaluator) {
-    std::vector<TypedValue> group_by;
+    auto *mem = aggregation_.get_allocator().GetMemoryResource();
+    std::vector<TypedValue, utils::Allocator<TypedValue>> group_by(mem);
     group_by.reserve(self_.group_by_.size());
     for (Expression *expression : self_.group_by_) {
       group_by.emplace_back(expression->Accept(*evaluator));
     }
-    auto &agg_value = aggregation_[group_by];
+    auto &agg_value =
+        aggregation_.try_emplace(std::move(group_by), mem).first->second;
     EnsureInitialized(frame, &agg_value);
     Update(evaluator, &agg_value);
   }
@@ -2690,7 +2711,6 @@ UniqueCursorPtr Aggregate::MakeCursor(database::GraphDbAccessor *db,
   return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, db, mem);
 }
 
-
 Skip::Skip(const std::shared_ptr<LogicalOperator> &input,
            Expression *expression)
     : input_(input), expression_(expression) {}
@@ -3144,16 +3164,6 @@ void Unwind::UnwindCursor::Reset() {
   input_value_it_ = input_value_.end();
 }
 
-struct TypedValueVectorAllocatorEqual {
-  bool operator()(
-      const std::vector<TypedValue, utils::Allocator<TypedValue>> &left,
-      const std::vector<TypedValue, utils::Allocator<TypedValue>> &right)
-      const {
-    return std::equal(left.begin(), left.end(), right.begin(), right.end(),
-                      TypedValue::BoolEqual());
-  }
-};
-
 class DistinctCursor : public Cursor {
  public:
   DistinctCursor(const Distinct &self, database::GraphDbAccessor *db,
diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp
index 4c9309076..e4b2f71b3 100644
--- a/tests/benchmark/query/execution.cpp
+++ b/tests/benchmark/query/execution.cpp
@@ -291,4 +291,54 @@ BENCHMARK_TEMPLATE(Accumulate, MonotonicBufferResource)
     ->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
     ->Unit(benchmark::kMicrosecond);
 
+template <class TMemory>
+// NOLINTNEXTLINE(google-runtime-references)
+static void Aggregate(benchmark::State &state) {
+  query::AstStorage ast;
+  query::Parameters parameters;
+  database::GraphDb db;
+  AddVertices(&db, state.range(1));
+  query::SymbolTable symbol_table;
+  auto scan_all = std::make_shared<query::plan::ScanAll>(
+      nullptr, symbol_table.CreateSymbol("v", false));
+  std::vector<query::Symbol> symbols;
+  symbols.reserve(state.range(0));
+  std::vector<query::Expression *> group_by;
+  group_by.reserve(state.range(0));
+  std::vector<query::plan::Aggregate::Element> aggregations;
+  aggregations.reserve(state.range(0));
+  for (int i = 0; i < state.range(0); ++i) {
+    auto sym = symbol_table.CreateSymbol(std::to_string(i), false);
+    symbols.push_back(sym);
+    group_by.push_back(ast.Create<query::Identifier>(sym.name())->MapTo(sym));
+    aggregations.push_back(
+        {ast.Create<query::PrimitiveLiteral>(i), nullptr,
+         query::Aggregation::Op::SUM,
+         symbol_table.CreateSymbol("out" + std::to_string(i), false)});
+  }
+  query::plan::Aggregate aggregate(scan_all, aggregations, group_by, symbols);
+  auto dba = db.Access();
+  query::Frame frame(symbol_table.max_position());
+  // Nothing should be used from the EvaluationContext, so leave it empty.
+  query::EvaluationContext evaluation_context;
+  while (state.KeepRunning()) {
+    query::ExecutionContext execution_context{&dba, symbol_table,
+                                              evaluation_context};
+    TMemory memory;
+    auto cursor = aggregate.MakeCursor(&dba, memory.get());
+    frame[symbols.front()] = 0;  // initial group_by value
+    while (cursor->Pull(frame, execution_context))
+      frame[symbols.front()].ValueInt()++;  // new group_by value
+  }
+  state.SetItemsProcessed(state.iterations());
+}
+
+BENCHMARK_TEMPLATE(Aggregate, NewDeleteResource)
+    ->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
+    ->Unit(benchmark::kMicrosecond);
+
+BENCHMARK_TEMPLATE(Aggregate, MonotonicBufferResource)
+    ->Ranges({{4, 1U << 7U}, {512, 1U << 13U}})
+    ->Unit(benchmark::kMicrosecond);
+
 BENCHMARK_MAIN();