Merge pull request #763 from memgraph/T1235-MG-implement-EdgeUniquenessFilter-with-PullMultiple

Implement edge uniqueness filter with pull multiple
This commit is contained in:
János Benjamin Antal 2023-02-08 13:40:15 +01:00 committed by GitHub
commit a02abc8f79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 15 deletions

View File

@ -761,7 +761,7 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
output_frame[output_symbol_] = TypedValue(it->second);
populated_any = true;
++output_frame_it;
}
}
own_frames_it_->MakeInvalid();
}
break;
@ -1333,28 +1333,47 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) {
return a.ValueEdge() == b.ValueEdge();
}
bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector<Symbol> &previous_symbols) {
// This shouldn't raise a TypedValueException, because the planner
// makes sure these are all of the expected type. In case they are not
// an error should be raised long before this code is executed.
return std::ranges::all_of(previous_symbols,
[&frame, &expand_value = frame[expand_symbol]](const auto &previous_symbol) {
const auto &previous_value = frame[previous_symbol];
return !ContainsSameEdge(previous_value, expand_value);
});
}
} // namespace
bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("EdgeUniquenessFilter");
auto expansion_ok = [&]() {
const auto &expand_value = frame[self_.expand_symbol_];
for (const auto &previous_symbol : self_.previous_symbols_) {
const auto &previous_value = frame[previous_symbol];
// This shouldn't raise a TypedValueException, because the planner
// makes sure these are all of the expected type. In case they are not
// an error should be raised long before this code is executed.
if (ContainsSameEdge(previous_value, expand_value)) return false;
}
return true;
};
while (input_cursor_->Pull(frame, context))
if (expansion_ok()) return true;
if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) return true;
return false;
}
bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame &output_multi_frame,
ExecutionContext &context) {
SCOPED_PROFILE_OP("EdgeUniquenessFilterMF");
auto populated_any = false;
while (output_multi_frame.HasInvalidFrame()) {
if (!input_cursor_->PullMultiple(output_multi_frame, context)) {
return populated_any;
}
for (auto &frame : output_multi_frame.GetValidFramesConsumer()) {
if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) {
populated_any = true;
} else {
frame.MakeInvalid();
}
}
}
return populated_any;
}
void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Shutdown() { input_cursor_->Shutdown(); }
void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Reset() { input_cursor_->Reset(); }

View File

@ -1570,6 +1570,7 @@ edge lists).")
EdgeUniquenessFilterCursor(const EdgeUniquenessFilter &,
utils::MemoryResource *);
bool Pull(Frame &, ExecutionContext &) override;
bool PullMultiple(MultiFrame &, ExecutionContext &) override;
void Shutdown() override;
void Reset() override;