From 9589dd97b676896a062e3b2dc9f566de5524db8e Mon Sep 17 00:00:00 2001 From: jeremy Date: Fri, 30 Dec 2022 16:21:41 +0100 Subject: [PATCH 1/8] Impl and correct aggregate --- src/query/v2/multiframe.hpp | 2 +- src/query/v2/plan/operator.cpp | 116 +++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp index aeacda7d3..0365b449f 100644 --- a/src/query/v2/multiframe.hpp +++ b/src/query/v2/multiframe.hpp @@ -168,7 +168,7 @@ class ValidFramesModifier { Iterator &operator++() { do { ptr_++; - } while (*this != iterator_wrapper_->end() && ptr_->IsValid()); + } while (*this != iterator_wrapper_->end() && !ptr_->IsValid()); return *this; } diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index b6bbf9ce1..43376aaed 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1234,6 +1234,55 @@ class AggregateCursor : public Cursor { return true; } + void PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP("AggregateMF"); + + if (!pulled_all_input_) { + while (!pulled_all_input_) { + ProcessAll(multi_frame, &context); + } + multi_frame.MakeAllFramesInvalid(); + aggregation_it_ = aggregation_.begin(); + + // in case there is no input and no group_bys we need to return true + // just this once + if (aggregation_.empty() && self_.group_by_.empty()) { + auto frame = multi_frame.GetFirstFrame(); + frame.MakeValid(); + auto *pull_memory = context.evaluation_context.memory; + // place default aggregation values on the frame + for (const auto &elem : self_.aggregations_) { + frame[elem.output_sym] = DefaultAggregationOpValue(elem, pull_memory); + } + // place null as remember values on the frame + for (const Symbol &remember_sym : self_.remember_) { + frame[remember_sym] = TypedValue(pull_memory); + } + return; + } + } + + if (aggregation_it_ == aggregation_.end()) { + return; + } + + // place aggregation values on the frame + auto &frame = multi_frame.GetFirstFrame(); + frame.MakeValid(); + auto aggregation_values_it = aggregation_it_->second.values_.begin(); + for (const auto &aggregation_elem : self_.aggregations_) { + frame[aggregation_elem.output_sym] = *aggregation_values_it++; + } + + // place remember values on the frame + auto remember_values_it = aggregation_it_->second.remember_.begin(); + for (const Symbol &remember_sym : self_.remember_) { + frame[remember_sym] = *remember_values_it++; + } + + aggregation_it_++; + } + void Shutdown() override { input_cursor_->Shutdown(); } void Reset() override { @@ -1312,6 +1361,36 @@ class AggregateCursor : public Cursor { } } + void ProcessAll(MultiFrame &multi_frame, ExecutionContext *context) { + input_cursor_->PullMultiple(multi_frame, *context); + auto valid_frames_modifier = + multi_frame.GetValidFramesConsumer(); // consumer is needed i.o. reader because of the evaluator + if (valid_frames_modifier.begin() == valid_frames_modifier.end()) { + // There are no valid frames, we stop + pulled_all_input_ = true; + return; + } + + for (auto &frame : valid_frames_modifier) { + ExpressionEvaluator evaluator(&frame, context->symbol_table, context->evaluation_context, context->request_router, + storage::v3::View::NEW); + ProcessOne(frame, &evaluator); + } + + // calculate AVG aggregations (so far they have only been summed) + for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) { + if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue; + for (auto &kv : aggregation_) { + AggregationValue &agg_value = kv.second; + auto count = agg_value.counts_[pos]; + auto *pull_memory = context->evaluation_context.memory; + if (count > 0) { + agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast(count), pull_memory); + } + } + } + } + /** * Performs a single accumulation. */ @@ -1327,6 +1406,21 @@ class AggregateCursor : public Cursor { Update(evaluator, &agg_value); } + /** + * Performs a single accumulation. + */ + void ProcessOne(FrameWithValidity &frame, ExpressionEvaluator *evaluator) { + auto *mem = aggregation_.get_allocator().GetMemoryResource(); + utils::pmr::vector group_by(mem); + group_by.reserve(self_.group_by_.size()); + for (Expression *expression : self_.group_by_) { + group_by.emplace_back(expression->Accept(*evaluator)); + } + auto &agg_value = aggregation_.try_emplace(std::move(group_by), mem).first->second; + EnsureInitialized(frame, &agg_value); + Update(evaluator, &agg_value); + } + /** Ensures the new AggregationValue has been initialized. This means * that the value vectors are filled with an appropriate number of Nulls, * counts are set to 0 and remember values are remembered. @@ -1343,6 +1437,28 @@ class AggregateCursor : public Cursor { for (const Symbol &remember_sym : self_.remember_) agg_value->remember_.push_back(frame[remember_sym]); } + /** Ensures the new AggregationValue has been initialized. This means + * that the value vectors are filled with an appropriate number of Nulls, + * counts are set to 0 and remember values are remembered. + */ + void EnsureInitialized(FrameWithValidity &frame, AggregateCursor::AggregationValue *agg_value) const { + if (!agg_value->values_.empty()) { + frame.MakeInvalid(); + return; + } + + for (const auto &agg_elem : self_.aggregations_) { + auto *mem = agg_value->values_.get_allocator().GetMemoryResource(); + agg_value->values_.emplace_back(DefaultAggregationOpValue(agg_elem, mem)); + } + agg_value->counts_.resize(self_.aggregations_.size(), 0); + + for (const Symbol &remember_sym : self_.remember_) { + agg_value->remember_.push_back(frame[remember_sym]); + } + frame.MakeInvalid(); + } + /** Updates the given AggregationValue with new data. Assumes that * the AggregationValue has been initialized */ void Update(ExpressionEvaluator *evaluator, AggregateCursor::AggregationValue *agg_value) { From 883922dba5f2308fb3e64fc859016989d5c22b41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 13:11:55 +0100 Subject: [PATCH 2/8] Eliminate warning about deprecated macro --- tests/unit/query_v2_plan.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/query_v2_plan.cpp b/tests/unit/query_v2_plan.cpp index 8fc5e11c7..c782f9127 100644 --- a/tests/unit/query_v2_plan.cpp +++ b/tests/unit/query_v2_plan.cpp @@ -86,7 +86,7 @@ class TestPlanner : public ::testing::Test {}; using PlannerTypes = ::testing::Types; -TYPED_TEST_CASE(TestPlanner, PlannerTypes); +TYPED_TEST_SUITE(TestPlanner, PlannerTypes); TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) { const char *prim_label_name = "prim_label_one"; From c12a5a901909adea92ee8fd007345f64f50b7260 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 16:50:16 +0100 Subject: [PATCH 3/8] Make multi-create queries work --- src/query/v2/plan/operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 95a7f6c40..a11b75d4f 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -202,7 +202,7 @@ class DistributedCreateNodeCursor : public Cursor { request_router->CreateVertices(NodeCreationInfoToRequests(context, multi_frame)); } PlaceNodesOnTheMultiFrame(multi_frame, context); - return false; + return true; } void Shutdown() override { input_cursor_->Shutdown(); } From 2ecf580ae73890012057e1c152422096e21df047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 16:50:58 +0100 Subject: [PATCH 4/8] Eliminate warnings --- src/query/v2/plan/operator.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index a11b75d4f..0422db14c 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1566,7 +1566,7 @@ class AggregateCursor : public Cursor { for (; count_it < agg_value->counts_.end(); count_it++, value_it++, agg_elem_it++) { // COUNT(*) is the only case where input expression is optional // handle it here - auto input_expr_ptr = agg_elem_it->value; + auto *input_expr_ptr = agg_elem_it->value; if (!input_expr_ptr) { *count_it += 1; *value_it = *count_it; @@ -1657,7 +1657,7 @@ class AggregateCursor : public Cursor { /** Checks if the given TypedValue is legal in MIN and MAX. If not * an appropriate exception is thrown. */ - void EnsureOkForMinMax(const TypedValue &value) const { + static void EnsureOkForMinMax(const TypedValue &value) { switch (value.type()) { case TypedValue::Type::Bool: case TypedValue::Type::Int: @@ -1673,7 +1673,7 @@ class AggregateCursor : public Cursor { /** Checks if the given TypedValue is legal in AVG and SUM. If not * an appropriate exception is thrown. */ - void EnsureOkForAvgSum(const TypedValue &value) const { + static void EnsureOkForAvgSum(const TypedValue &value) { switch (value.type()) { case TypedValue::Type::Int: case TypedValue::Type::Double: From d36c0cc4243a8a2eb312f02534ac9f60501349ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 16:52:46 +0100 Subject: [PATCH 5/8] Refactor `AggregateCursor::ProcessAll` --- src/query/v2/plan/operator.cpp | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 0422db14c..358a638be 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1324,14 +1324,13 @@ class AggregateCursor : public Cursor { return true; } - void PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { + bool PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("AggregateMF"); if (!pulled_all_input_) { - while (!pulled_all_input_) { - ProcessAll(multi_frame, &context); - } - multi_frame.MakeAllFramesInvalid(); + ProcessAll(multi_frame, &context); + pulled_all_input_ = true; + MG_ASSERT(!multi_frame.HasValidFrame(), "ProcessAll didn't consumed all input frames!"); aggregation_it_ = aggregation_.begin(); // in case there is no input and no group_bys we need to return true @@ -1348,12 +1347,12 @@ class AggregateCursor : public Cursor { for (const Symbol &remember_sym : self_.remember_) { frame[remember_sym] = TypedValue(pull_memory); } - return; + return true; } } if (aggregation_it_ == aggregation_.end()) { - return; + return false; } // place aggregation values on the frame @@ -1371,6 +1370,7 @@ class AggregateCursor : public Cursor { } aggregation_it_++; + return true; } void Shutdown() override { input_cursor_->Shutdown(); } @@ -1452,19 +1452,15 @@ class AggregateCursor : public Cursor { } void ProcessAll(MultiFrame &multi_frame, ExecutionContext *context) { - input_cursor_->PullMultiple(multi_frame, *context); - auto valid_frames_modifier = - multi_frame.GetValidFramesConsumer(); // consumer is needed i.o. reader because of the evaluator - if (valid_frames_modifier.begin() == valid_frames_modifier.end()) { - // There are no valid frames, we stop - pulled_all_input_ = true; - return; - } + while (input_cursor_->PullMultiple(multi_frame, *context)) { + auto valid_frames_modifier = + multi_frame.GetValidFramesConsumer(); // consumer is needed i.o. reader because of the evaluator - for (auto &frame : valid_frames_modifier) { - ExpressionEvaluator evaluator(&frame, context->symbol_table, context->evaluation_context, context->request_router, - storage::v3::View::NEW); - ProcessOne(frame, &evaluator); + for (auto &frame : valid_frames_modifier) { + ExpressionEvaluator evaluator(&frame, context->symbol_table, context->evaluation_context, + context->request_router, storage::v3::View::NEW); + ProcessOne(frame, &evaluator); + } } // calculate AVG aggregations (so far they have only been summed) From 44fc2d01c769a2fbde972b26f3cdbaae85912327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 17:04:40 +0100 Subject: [PATCH 6/8] Unify logic between multi and single frame pull --- src/query/v2/plan/operator.cpp | 49 +++++++++------------------------- 1 file changed, 13 insertions(+), 36 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 358a638be..f70b4b6b1 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1437,18 +1437,7 @@ class AggregateCursor : public Cursor { ProcessOne(*frame, &evaluator); } - // calculate AVG aggregations (so far they have only been summed) - for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) { - if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue; - for (auto &kv : aggregation_) { - AggregationValue &agg_value = kv.second; - auto count = agg_value.counts_[pos]; - auto *pull_memory = context->evaluation_context.memory; - if (count > 0) { - agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast(count), pull_memory); - } - } - } + CalculateAverages(*context); } void ProcessAll(MultiFrame &multi_frame, ExecutionContext *context) { @@ -1463,18 +1452,7 @@ class AggregateCursor : public Cursor { } } - // calculate AVG aggregations (so far they have only been summed) - for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) { - if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue; - for (auto &kv : aggregation_) { - AggregationValue &agg_value = kv.second; - auto count = agg_value.counts_[pos]; - auto *pull_memory = context->evaluation_context.memory; - if (count > 0) { - agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast(count), pull_memory); - } - } - } + CalculateAverages(*context); } /** @@ -1492,19 +1470,18 @@ class AggregateCursor : public Cursor { Update(evaluator, &agg_value); } - /** - * Performs a single accumulation. - */ - void ProcessOne(FrameWithValidity &frame, ExpressionEvaluator *evaluator) { - auto *mem = aggregation_.get_allocator().GetMemoryResource(); - utils::pmr::vector group_by(mem); - group_by.reserve(self_.group_by_.size()); - for (Expression *expression : self_.group_by_) { - group_by.emplace_back(expression->Accept(*evaluator)); + void CalculateAverages(ExecutionContext &context) { + for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) { + if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue; + for (auto &kv : aggregation_) { + AggregationValue &agg_value = kv.second; + auto count = agg_value.counts_[pos]; + auto *pull_memory = context.evaluation_context.memory; + if (count > 0) { + agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast(count), pull_memory); + } + } } - auto &agg_value = aggregation_.try_emplace(std::move(group_by), mem).first->second; - EnsureInitialized(frame, &agg_value); - Update(evaluator, &agg_value); } /** Ensures the new AggregationValue has been initialized. This means From 9214c715e255b6beea7c4af0d3aaab3d639af085 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 17:05:03 +0100 Subject: [PATCH 7/8] Address review comment --- src/query/v2/plan/operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index f70b4b6b1..c62c7f1bb 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1320,7 +1320,7 @@ class AggregateCursor : public Cursor { auto remember_values_it = aggregation_it_->second.remember_.begin(); for (const Symbol &remember_sym : self_.remember_) frame[remember_sym] = *remember_values_it++; - aggregation_it_++; + ++aggregation_it_; return true; } From fd047e7303d988f7f5ad9071acdf1f55ba99cd04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Fri, 27 Jan 2023 17:10:21 +0100 Subject: [PATCH 8/8] Unify even more logic --- src/query/v2/plan/operator.cpp | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index c62c7f1bb..168266ab4 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1500,28 +1500,6 @@ class AggregateCursor : public Cursor { for (const Symbol &remember_sym : self_.remember_) agg_value->remember_.push_back(frame[remember_sym]); } - /** Ensures the new AggregationValue has been initialized. This means - * that the value vectors are filled with an appropriate number of Nulls, - * counts are set to 0 and remember values are remembered. - */ - void EnsureInitialized(FrameWithValidity &frame, AggregateCursor::AggregationValue *agg_value) const { - if (!agg_value->values_.empty()) { - frame.MakeInvalid(); - return; - } - - for (const auto &agg_elem : self_.aggregations_) { - auto *mem = agg_value->values_.get_allocator().GetMemoryResource(); - agg_value->values_.emplace_back(DefaultAggregationOpValue(agg_elem, mem)); - } - agg_value->counts_.resize(self_.aggregations_.size(), 0); - - for (const Symbol &remember_sym : self_.remember_) { - agg_value->remember_.push_back(frame[remember_sym]); - } - frame.MakeInvalid(); - } - /** Updates the given AggregationValue with new data. Assumes that * the AggregationValue has been initialized */ void Update(ExpressionEvaluator *evaluator, AggregateCursor::AggregationValue *agg_value) {