Add initial impl for EdgeUniquenessFilterCursor::PullMultiple
This commit is contained in:
parent
303362d41c
commit
2219dee6f6
src/query/v2
@ -812,7 +812,8 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple
|
||||
// auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple
|
||||
auto should_pull_multiple = false;
|
||||
if (should_pull_multiple) {
|
||||
return PullMultiple(stream, n, output_symbols, summary);
|
||||
}
|
||||
|
@ -1207,28 +1207,128 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) {
|
||||
|
||||
return a.ValueEdge() == b.ValueEdge();
|
||||
}
|
||||
|
||||
bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector<Symbol> &previous_symbols) {
|
||||
const auto &expand_value = frame[expand_symbol];
|
||||
for (const auto &previous_symbol : previous_symbols) {
|
||||
const auto &previous_value = frame[previous_symbol];
|
||||
// This shouldn't raise a TypedValueException, because the planner
|
||||
// makes sure these are all of the expected type. In case they are not
|
||||
// an error should be raised long before this code is executed.
|
||||
if (ContainsSameEdge(previous_value, expand_value)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("EdgeUniquenessFilter");
|
||||
|
||||
auto expansion_ok = [&]() {
|
||||
const auto &expand_value = frame[self_.expand_symbol_];
|
||||
for (const auto &previous_symbol : self_.previous_symbols_) {
|
||||
const auto &previous_value = frame[previous_symbol];
|
||||
// This shouldn't raise a TypedValueException, because the planner
|
||||
// makes sure these are all of the expected type. In case they are not
|
||||
// an error should be raised long before this code is executed.
|
||||
if (ContainsSameEdge(previous_value, expand_value)) return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
// // TODO (gvolfing) Make the simple Pull method use the function instead of the lambda as well.
|
||||
// auto expansion_ok = [&]() {
|
||||
// const auto &expand_value = frame[self_.expand_symbol_];
|
||||
// for (const auto &previous_symbol : self_.previous_symbols_) {
|
||||
// const auto &previous_value = frame[previous_symbol];
|
||||
// // This shouldn't raise a TypedValueException, because the planner
|
||||
// // makes sure these are all of the expected type. In case they are not
|
||||
// // an error should be raised long before this code is executed.
|
||||
// if (ContainsSameEdge(previous_value, expand_value)) return false;
|
||||
// }
|
||||
// return true;
|
||||
// };
|
||||
|
||||
while (input_cursor_->Pull(frame, context))
|
||||
if (expansion_ok()) return true;
|
||||
if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame &output_multi_frame,
|
||||
ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("EdgeUniquenessFilterMF");
|
||||
|
||||
if (!own_multi_frame_.has_value()) {
|
||||
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe,
|
||||
output_multi_frame.GetMemoryResource()));
|
||||
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
}
|
||||
MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size());
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
auto populated_any = false;
|
||||
|
||||
while (true) {
|
||||
switch (state_) {
|
||||
case State::PullInput: {
|
||||
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
|
||||
state_ = State::Exhausted;
|
||||
return populated_any;
|
||||
}
|
||||
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
state_ = State::PopulateOutput;
|
||||
break;
|
||||
}
|
||||
case State::PopulateOutput: {
|
||||
if (!output_multi_frame.HasInvalidFrame()) {
|
||||
return populated_any;
|
||||
}
|
||||
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
state_ = State::PullInput;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto output_frame_it = output_frames_populator.begin();
|
||||
output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end();
|
||||
++own_frames_it_) {
|
||||
auto &output_frame = *output_frame_it;
|
||||
|
||||
if (IsExpansionOk(*own_frames_it_, self_.expand_symbol_, self_.previous_symbols_)) {
|
||||
output_frame = *own_frames_it_;
|
||||
populated_any = true;
|
||||
} else {
|
||||
own_frames_it_->MakeInvalid();
|
||||
}
|
||||
++output_frame_it;
|
||||
|
||||
/////////////////////////////////////////////////
|
||||
/*
|
||||
ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context,
|
||||
context.request_router, storage::v3::View::NEW);
|
||||
|
||||
std::vector<msgs::Value> pk;
|
||||
for (auto *primary_property : primary_key_) {
|
||||
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
|
||||
}
|
||||
|
||||
const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
|
||||
auto vertex_id = std::make_pair(label, std::move(pk));
|
||||
|
||||
if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) {
|
||||
output_frame = *own_frames_it_;
|
||||
output_frame[output_symbol_] = TypedValue(it->second);
|
||||
populated_any = true;
|
||||
++output_frame_it;
|
||||
}
|
||||
own_frames_it_->MakeInvalid();
|
||||
*/
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::Exhausted: {
|
||||
return populated_any;
|
||||
}
|
||||
}
|
||||
}
|
||||
return populated_any;
|
||||
}
|
||||
|
||||
void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
@ -1570,12 +1570,19 @@ edge lists).")
|
||||
EdgeUniquenessFilterCursor(const EdgeUniquenessFilter &,
|
||||
utils::MemoryResource *);
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
bool PullMultiple(MultiFrame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
const EdgeUniquenessFilter &self_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
enum class State { PullInput, PopulateOutput, Exhausted };
|
||||
|
||||
State state_{State::PullInput};
|
||||
std::optional<MultiFrame> own_multi_frame_;
|
||||
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator own_frames_it_;
|
||||
};
|
||||
cpp<#)
|
||||
(:serialize (:slk))
|
||||
|
Loading…
Reference in New Issue
Block a user