From 2219dee6f606a00b0d9a5fe17b3984fda1b04547 Mon Sep 17 00:00:00 2001 From: gvolfing <gabor.volfinger@memgraph.io> Date: Mon, 6 Feb 2023 12:49:32 +0100 Subject: [PATCH 1/7] Add initial impl for EdgeUniquenessFilterCursor::PullMultiple --- src/query/v2/interpreter.cpp | 3 +- src/query/v2/plan/operator.cpp | 126 +++++++++++++++++++++++++++++---- src/query/v2/plan/operator.lcp | 7 ++ 3 files changed, 122 insertions(+), 14 deletions(-) diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index aa220d764..40c3fced4 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -812,7 +812,8 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n, const std::vector<Symbol> &output_symbols, std::map<std::string, TypedValue> *summary) { - auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple + // auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple + auto should_pull_multiple = false; if (should_pull_multiple) { return PullMultiple(stream, n, output_symbols, summary); } diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index aa9743878..8bd3544cf 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1207,28 +1207,128 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) { return a.ValueEdge() == b.ValueEdge(); } + +bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector<Symbol> &previous_symbols) { + const auto &expand_value = frame[expand_symbol]; + for (const auto &previous_symbol : previous_symbols) { + const auto &previous_value = frame[previous_symbol]; + // This shouldn't raise a TypedValueException, because the planner + // makes sure these are all of the expected type. In case they are not + // an error should be raised long before this code is executed. + if (ContainsSameEdge(previous_value, expand_value)) return false; + } + return true; +} + } // namespace bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("EdgeUniquenessFilter"); - - auto expansion_ok = [&]() { - const auto &expand_value = frame[self_.expand_symbol_]; - for (const auto &previous_symbol : self_.previous_symbols_) { - const auto &previous_value = frame[previous_symbol]; - // This shouldn't raise a TypedValueException, because the planner - // makes sure these are all of the expected type. In case they are not - // an error should be raised long before this code is executed. - if (ContainsSameEdge(previous_value, expand_value)) return false; - } - return true; - }; + // // TODO (gvolfing) Make the simple Pull method use the function instead of the lambda as well. + // auto expansion_ok = [&]() { + // const auto &expand_value = frame[self_.expand_symbol_]; + // for (const auto &previous_symbol : self_.previous_symbols_) { + // const auto &previous_value = frame[previous_symbol]; + // // This shouldn't raise a TypedValueException, because the planner + // // makes sure these are all of the expected type. In case they are not + // // an error should be raised long before this code is executed. + // if (ContainsSameEdge(previous_value, expand_value)) return false; + // } + // return true; + // }; while (input_cursor_->Pull(frame, context)) - if (expansion_ok()) return true; + if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) return true; return false; } +bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame &output_multi_frame, + ExecutionContext &context) { + SCOPED_PROFILE_OP("EdgeUniquenessFilterMF"); + + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe, + output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size()); + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInput: { + if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { + state_ = State::Exhausted; + return populated_any; + } + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + + if (own_frames_it_ == own_frames_consumer_->end()) { + continue; + } + + state_ = State::PopulateOutput; + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return populated_any; + } + + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; + continue; + } + + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end(); + ++own_frames_it_) { + auto &output_frame = *output_frame_it; + + if (IsExpansionOk(*own_frames_it_, self_.expand_symbol_, self_.previous_symbols_)) { + output_frame = *own_frames_it_; + populated_any = true; + } else { + own_frames_it_->MakeInvalid(); + } + ++output_frame_it; + + ///////////////////////////////////////////////// + /* + ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context, + context.request_router, storage::v3::View::NEW); + + std::vector<msgs::Value> pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + auto vertex_id = std::make_pair(label, std::move(pk)); + + if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) { + output_frame = *own_frames_it_; + output_frame[output_symbol_] = TypedValue(it->second); + populated_any = true; + ++output_frame_it; + } + own_frames_it_->MakeInvalid(); + */ + } + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; +} + void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Shutdown() { input_cursor_->Shutdown(); } void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Reset() { input_cursor_->Reset(); } diff --git a/src/query/v2/plan/operator.lcp b/src/query/v2/plan/operator.lcp index 4f34cc061..ae02b5931 100644 --- a/src/query/v2/plan/operator.lcp +++ b/src/query/v2/plan/operator.lcp @@ -1570,12 +1570,19 @@ edge lists).") EdgeUniquenessFilterCursor(const EdgeUniquenessFilter &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; + bool PullMultiple(MultiFrame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; private: const EdgeUniquenessFilter &self_; const UniqueCursorPtr input_cursor_; + enum class State { PullInput, PopulateOutput, Exhausted }; + + State state_{State::PullInput}; + std::optional<MultiFrame> own_multi_frame_; + std::optional<ValidFramesConsumer> own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; }; cpp<#) (:serialize (:slk)) From 7e99f32adb080bea62149d4ec200a5b7ea479236 Mon Sep 17 00:00:00 2001 From: gvolfing <gabor.volfinger@memgraph.io> Date: Mon, 6 Feb 2023 15:47:18 +0100 Subject: [PATCH 2/7] Remove uncommented, useless code --- src/query/v2/interpreter.cpp | 3 +-- src/query/v2/plan/operator.cpp | 37 +--------------------------------- 2 files changed, 2 insertions(+), 38 deletions(-) diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index 40c3fced4..aa220d764 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -812,8 +812,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n, const std::vector<Symbol> &output_symbols, std::map<std::string, TypedValue> *summary) { - // auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple - auto should_pull_multiple = false; + auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple if (should_pull_multiple) { return PullMultiple(stream, n, output_symbols, summary); } diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 5397d6735..beca89b13 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -761,7 +761,7 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { output_frame[output_symbol_] = TypedValue(it->second); populated_any = true; ++output_frame_it; - } + } own_frames_it_->MakeInvalid(); } break; @@ -1350,19 +1350,6 @@ bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector< bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("EdgeUniquenessFilter"); - // // TODO (gvolfing) Make the simple Pull method use the function instead of the lambda as well. - // auto expansion_ok = [&]() { - // const auto &expand_value = frame[self_.expand_symbol_]; - // for (const auto &previous_symbol : self_.previous_symbols_) { - // const auto &previous_value = frame[previous_symbol]; - // // This shouldn't raise a TypedValueException, because the planner - // // makes sure these are all of the expected type. In case they are not - // // an error should be raised long before this code is executed. - // if (ContainsSameEdge(previous_value, expand_value)) return false; - // } - // return true; - // }; - while (input_cursor_->Pull(frame, context)) if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) return true; return false; @@ -1422,28 +1409,6 @@ bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame & own_frames_it_->MakeInvalid(); } ++output_frame_it; - - ///////////////////////////////////////////////// - /* - ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context, - context.request_router, storage::v3::View::NEW); - - std::vector<msgs::Value> pk; - for (auto *primary_property : primary_key_) { - pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); - } - - const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; - auto vertex_id = std::make_pair(label, std::move(pk)); - - if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) { - output_frame = *own_frames_it_; - output_frame[output_symbol_] = TypedValue(it->second); - populated_any = true; - ++output_frame_it; - } - own_frames_it_->MakeInvalid(); - */ } break; } From ac59e7f7e092fd357926f5faf5d845e8af19ca2f Mon Sep 17 00:00:00 2001 From: gvolfing <gabor.volfinger@memgraph.io> Date: Mon, 6 Feb 2023 15:54:07 +0100 Subject: [PATCH 3/7] Move loop variables incrementation into the same place --- src/query/v2/plan/operator.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index beca89b13..400ee4f53 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1399,7 +1399,7 @@ bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame & for (auto output_frame_it = output_frames_populator.begin(); output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end(); - ++own_frames_it_) { + ++own_frames_it_, ++output_frame_it) { auto &output_frame = *output_frame_it; if (IsExpansionOk(*own_frames_it_, self_.expand_symbol_, self_.previous_symbols_)) { @@ -1408,7 +1408,6 @@ bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame & } else { own_frames_it_->MakeInvalid(); } - ++output_frame_it; } break; } From 37f19867b088ee2a06cdafe36341eb84a5af37f2 Mon Sep 17 00:00:00 2001 From: gvolfing <gabor.volfinger@memgraph.io> Date: Tue, 7 Feb 2023 08:25:50 +0100 Subject: [PATCH 4/7] Make EdgeUniquenessFilterCursor impl simpler --- src/query/v2/plan/operator.cpp | 62 +++++----------------------------- src/query/v2/plan/operator.lcp | 6 ---- 2 files changed, 9 insertions(+), 59 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 400ee4f53..ec5413981 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1358,61 +1358,17 @@ bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, Execut bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) { SCOPED_PROFILE_OP("EdgeUniquenessFilterMF"); - - if (!own_multi_frame_.has_value()) { - own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe, - output_multi_frame.GetMemoryResource())); - own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); - own_frames_it_ = own_frames_consumer_->begin(); - } - MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size()); - - auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); auto populated_any = false; - while (true) { - switch (state_) { - case State::PullInput: { - if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { - state_ = State::Exhausted; - return populated_any; - } - own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); - own_frames_it_ = own_frames_consumer_->begin(); - - if (own_frames_it_ == own_frames_consumer_->end()) { - continue; - } - - state_ = State::PopulateOutput; - break; - } - case State::PopulateOutput: { - if (!output_multi_frame.HasInvalidFrame()) { - return populated_any; - } - - if (own_frames_it_ == own_frames_consumer_->end()) { - state_ = State::PullInput; - continue; - } - - for (auto output_frame_it = output_frames_populator.begin(); - output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end(); - ++own_frames_it_, ++output_frame_it) { - auto &output_frame = *output_frame_it; - - if (IsExpansionOk(*own_frames_it_, self_.expand_symbol_, self_.previous_symbols_)) { - output_frame = *own_frames_it_; - populated_any = true; - } else { - own_frames_it_->MakeInvalid(); - } - } - break; - } - case State::Exhausted: { - return populated_any; + while (output_multi_frame.HasInvalidFrame()) { + if (!input_cursor_->PullMultiple(output_multi_frame, context)) { + return populated_any; + } + for (auto &frame : output_multi_frame.GetValidFramesConsumer()) { + if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) { + populated_any = true; + } else { + frame.MakeInvalid(); } } } diff --git a/src/query/v2/plan/operator.lcp b/src/query/v2/plan/operator.lcp index ae02b5931..110ba8a33 100644 --- a/src/query/v2/plan/operator.lcp +++ b/src/query/v2/plan/operator.lcp @@ -1577,12 +1577,6 @@ edge lists).") private: const EdgeUniquenessFilter &self_; const UniqueCursorPtr input_cursor_; - enum class State { PullInput, PopulateOutput, Exhausted }; - - State state_{State::PullInput}; - std::optional<MultiFrame> own_multi_frame_; - std::optional<ValidFramesConsumer> own_frames_consumer_; - ValidFramesConsumer::Iterator own_frames_it_; }; cpp<#) (:serialize (:slk)) From 25226cca920d8c93f603713909f8b266da4623f9 Mon Sep 17 00:00:00 2001 From: gvolfing <107616712+gvolfing@users.noreply.github.com> Date: Wed, 8 Feb 2023 11:41:43 +0100 Subject: [PATCH 5/7] Update src/query/v2/plan/operator.cpp Co-authored-by: Jure Bajic <jure.bajic@memgraph.com> --- src/query/v2/plan/operator.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index ec5413981..6fa9c4db9 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1335,15 +1335,12 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) { } bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector<Symbol> &previous_symbols) { - const auto &expand_value = frame[expand_symbol]; - for (const auto &previous_symbol : previous_symbols) { - const auto &previous_value = frame[previous_symbol]; // This shouldn't raise a TypedValueException, because the planner // makes sure these are all of the expected type. In case they are not // an error should be raised long before this code is executed. - if (ContainsSameEdge(previous_value, expand_value)) return false; - } - return true; + return std::ranges::all_of(previous_symbols, [&expand_value = frame[expand_symbol]](const auto& previous_symbol) { + return ContainsSameEdge(previous_value, expand_value); + }); } } // namespace From 657279949aece28624900b012365adca8fa9d9a2 Mon Sep 17 00:00:00 2001 From: gvolfing <gabor.volfinger@memgraph.io> Date: Wed, 8 Feb 2023 12:13:46 +0100 Subject: [PATCH 6/7] Fix compile error --- src/query/v2/plan/operator.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 6fa9c4db9..4337add0d 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1335,12 +1335,14 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) { } bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector<Symbol> &previous_symbols) { - // This shouldn't raise a TypedValueException, because the planner - // makes sure these are all of the expected type. In case they are not - // an error should be raised long before this code is executed. - return std::ranges::all_of(previous_symbols, [&expand_value = frame[expand_symbol]](const auto& previous_symbol) { - return ContainsSameEdge(previous_value, expand_value); - }); + // This shouldn't raise a TypedValueException, because the planner + // makes sure these are all of the expected type. In case they are not + // an error should be raised long before this code is executed. + return std::ranges::all_of(previous_symbols, + [&frame, &expand_value = frame[expand_symbol]](const auto &previous_symbol) { + const auto &previous_value = frame[previous_symbol]; + return ContainsSameEdge(previous_value, expand_value); + }); } } // namespace From 096d1ce5f4d4e609614c3795fb22265abb692408 Mon Sep 17 00:00:00 2001 From: gvolfing <gabor.volfinger@memgraph.io> Date: Wed, 8 Feb 2023 12:57:21 +0100 Subject: [PATCH 7/7] Invert boolean logic when checking for unique edges --- src/query/v2/plan/operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 4337add0d..73de9ee28 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1341,7 +1341,7 @@ bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector< return std::ranges::all_of(previous_symbols, [&frame, &expand_value = frame[expand_symbol]](const auto &previous_symbol) { const auto &previous_value = frame[previous_symbol]; - return ContainsSameEdge(previous_value, expand_value); + return !ContainsSameEdge(previous_value, expand_value); }); }