Add simplest version of optional match

This commit is contained in:
János Benjamin Antal 2023-03-20 13:21:25 +01:00
parent 3672dd2b4f
commit a6e79fc9d9
2 changed files with 184 additions and 67 deletions

View File

@ -338,18 +338,31 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
return false;
}
bool Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
bool Once::OnceCursor::PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("OnceMF");
if (!did_pull_) {
auto &first_frame = multi_frame.GetFirstFrame();
first_frame.MakeValid();
did_pull_ = true;
if (pushed_down_multi_frame_.has_value()) {
auto pushed_down_consumer = pushed_down_multi_frame_->GetValidFramesConsumer();
auto output_populator = output_multi_frame.GetInvalidFramesPopulator();
auto consumer_it = pushed_down_consumer.begin();
auto populator_it = output_populator.begin();
for (; consumer_it != pushed_down_consumer.end(); ++consumer_it, ++populator_it) {
MG_ASSERT(populator_it != output_populator.end());
*populator_it = std::move(*consumer_it);
}
} else {
auto &first_frame = output_multi_frame.GetFirstFrame();
first_frame.MakeValid();
}
return true;
}
return false;
}
void Once::OnceCursor::PushDown(const MultiFrame &multi_frame) { pushed_down_multi_frame_.emplace(multi_frame); }
UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::OnceOperator);
@ -573,6 +586,8 @@ class DistributedScanAllAndFilterCursor : public Cursor {
return populated_any;
};
void PushDown(const MultiFrame &multi_frame) override { input_cursor_->PushDown(multi_frame); }
void Shutdown() override { input_cursor_->Shutdown(); }
void ResetExecutionState() {
@ -2104,6 +2119,8 @@ bool Optional::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
return visitor.PostVisit(*this);
}
class OptionalCursor;
UniqueCursorPtr Optional::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::OptionalOperator);
@ -2117,57 +2134,172 @@ std::vector<Symbol> Optional::ModifiedSymbols(const SymbolTable &table) const {
return symbols;
}
Optional::OptionalCursor::OptionalCursor(const Optional &self, utils::MemoryResource *mem)
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), optional_cursor_(self.optional_->MakeCursor(mem)) {}
class OptionalCursor : public Cursor {
public:
OptionalCursor(const Optional &self, utils::MemoryResource *mem)
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), optional_cursor_(self.optional_->MakeCursor(mem)) {}
bool Optional::OptionalCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("Optional");
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("Optional");
while (true) {
if (pull_input_) {
if (input_cursor_->Pull(frame, context)) {
// after a successful input from the input
// reset optional_ (it's expand iterators maintain state)
optional_cursor_->Reset();
} else
// input is exhausted, we're done
return false;
}
// pull from the optional_ cursor
if (optional_cursor_->Pull(frame, context)) {
// if successful, next Pull from this should not pull_input_
pull_input_ = false;
return true;
} else {
// failed to Pull from the merge_match cursor
while (true) {
if (pull_input_) {
// if we have just now pulled from the input
// and failed to pull from optional_ so set the
// optional symbols to Null, ensure next time the
// input gets pulled and return true
for (const Symbol &sym : self_.optional_symbols_) frame[sym] = TypedValue(context.evaluation_context.memory);
pull_input_ = true;
return true;
if (input_cursor_->Pull(frame, context)) {
// after a successful pull from the input
// reset optional_ (it's expand iterators maintain state)
optional_cursor_->Reset();
} else
// input is exhausted, we're done
return false;
}
// pull from the optional_ cursor
if (optional_cursor_->Pull(frame, context)) {
// if successful, next Pull from this should not pull_input_
pull_input_ = false;
return true;
} else {
// failed to Pull from the merge_match cursor
if (pull_input_) {
// if we have just now pulled from the input
// and failed to pull from optional_ so set the
// optional symbols to Null, ensure next time the
// input gets pulled and return true
for (const Symbol &sym : self_.optional_symbols_) frame[sym] = TypedValue(context.evaluation_context.memory);
pull_input_ = true;
return true;
}
// we have exhausted optional_cursor_ after 1 or more successful Pulls
// attempt next input_cursor_ pull
pull_input_ = true;
continue;
}
// we have exhausted optional_cursor_ after 1 or more successful Pulls
// attempt next input_cursor_ pull
pull_input_ = true;
continue;
}
}
}
void Optional::OptionalCursor::Shutdown() {
input_cursor_->Shutdown();
optional_cursor_->Shutdown();
}
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("OptionalMF");
void Optional::OptionalCursor::Reset() {
input_cursor_->Reset();
optional_cursor_->Reset();
pull_input_ = true;
}
EnsureOwnMultiFramesAreGood(output_multi_frame);
auto populated_any{false};
while (true) {
switch (state_) {
case State::PullInput: {
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
state_ = State::Exhausted;
break;
}
uint64_t frame_id{0U};
for (auto &frame : own_multi_frame_->GetValidFramesModifier()) {
frame.SetId(frame_id++);
}
last_matched_frame_ = 0U;
optional_cursor_->Reset();
optional_cursor_->PushDown(*own_multi_frame_);
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
own_frames_it_ = own_frames_consumer_->begin();
if (!optional_frames_consumer_.has_value() || optional_frames_it_ == optional_frames_consumer_->end()) {
state_ = State::PullOptional;
} else {
state_ = State::Populate;
}
break;
}
case State::PullOptional: {
optional_cursor_->PullMultiple(*optional_multi_frame_, context);
optional_frames_consumer_ = optional_multi_frame_->GetValidFramesConsumer();
optional_frames_it_ = optional_frames_consumer_->begin();
state_ = State::Populate;
break;
}
case State::Populate: {
if (own_frames_it_ == own_frames_consumer_->end()) {
state_ = State::PullInput;
continue;
}
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
auto output_frames_it = output_frames_populator.begin();
while (own_frames_it_ != own_frames_consumer_->end() && output_frames_it != output_frames_populator.end()) {
if (optional_frames_it_ != optional_frames_consumer_->end() &&
optional_frames_it_->Id() == own_frames_it_->Id()) {
// This might be a move, but then in we have to have special logic is EnsureOwnMultiFramesAreGood
*output_frames_it = *optional_frames_it_;
++optional_frames_it_;
if (optional_frames_it_->Id() != own_frames_it_->Id()) {
++own_frames_it_;
}
} else {
// TODO(antaljanosbenjamin): Remove (or improve the message of) this assert
MG_ASSERT(optional_frames_it_ == optional_frames_consumer_->end() ||
optional_frames_it_->Id() > own_frames_it_->Id(),
"This should be the case DELETE ME");
for (const auto &symbol : self_.optional_symbols_) {
spdlog::error("{}", symbol.name());
(*own_frames_it_)[symbol] = TypedValue(context.evaluation_context.memory);
}
// This might be a move, but then in we have to have special logic is EnsureOwnMultiFramesAreGood
*output_frames_it = *own_frames_it_;
++own_frames_it_;
}
populated_any = true;
++output_frames_it;
}
break;
}
case State::Exhausted: {
return populated_any;
}
}
}
}
void Shutdown() override {
input_cursor_->Shutdown();
optional_cursor_->Shutdown();
}
void Reset() override {
// TODO(antaljanosbenjamin)
input_cursor_->Reset();
optional_cursor_->Reset();
pull_input_ = true;
}
private:
enum class State { PullInput, PullOptional, Populate, Exhausted };
void EnsureOwnMultiFramesAreGood(MultiFrame &output_multi_frame) {
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
optional_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
}
MG_ASSERT(output_multi_frame.GetFirstFrame().Elems().size() == own_multi_frame_->GetFirstFrame().Elems().size());
}
const Optional &self_;
const UniqueCursorPtr input_cursor_;
const UniqueCursorPtr optional_cursor_;
State state_{State::PullInput};
std::optional<MultiFrame> own_multi_frame_;
std::optional<ValidFramesConsumer> own_frames_consumer_;
ValidFramesConsumer::Iterator own_frames_it_;
std::optional<MultiFrame> optional_multi_frame_;
std::optional<ValidFramesConsumer> optional_frames_consumer_;
ValidFramesConsumer::Iterator optional_frames_it_;
uint64_t last_matched_frame_{0U};
// indicates if the next Pull from this cursor should
// perform a Pull from the input_cursor_
// this is true when:
// - first pulling from this Cursor
// - previous Pull from this cursor exhausted the optional_cursor_
bool pull_input_{true};
};
Unwind::Unwind(const std::shared_ptr<LogicalOperator> &input, Expression *input_expression, Symbol output_symbol)
: input_(input ? input : std::make_shared<Once>()),
@ -3379,6 +3511,8 @@ class DistributedExpandCursor : public Cursor {
return populated_any;
}
void PushDown(const MultiFrame &multi_frame) override { input_cursor_->PushDown(multi_frame); }
void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),

View File

@ -88,6 +88,8 @@ class Cursor {
/// @throws QueryRuntimeException if something went wrong with execution
virtual bool PullMultiple(MultiFrame &, ExecutionContext &) {MG_ASSERT(false, "PullMultipleIsNotImplemented"); return false; }
virtual void PushDown(const MultiFrame&) { MG_ASSERT(false, "PushDownIsNotImplemented"); }
/// Resets the Cursor to its initial state.
virtual void Reset() = 0;
@ -350,11 +352,13 @@ and false on every following Pull.")
public:
OnceCursor() {}
bool PullMultiple(MultiFrame &, ExecutionContext &) override;
void PushDown(const MultiFrame&) override;
bool Pull(Frame &, ExecutionContext &) override;
void Shutdown() override;
void Reset() override;
private:
std::optional<MultiFrame> pushed_down_multi_frame_;
bool did_pull_{false};
};
cpp<#)
@ -1967,27 +1971,6 @@ and returns true, once.")
input_ = input;
}
cpp<#)
(:private
#>cpp
class OptionalCursor : public Cursor {
public:
OptionalCursor(const Optional &, utils::MemoryResource *);
bool Pull(Frame &, ExecutionContext &) override;
void Shutdown() override;
void Reset() override;
private:
const Optional &self_;
const UniqueCursorPtr input_cursor_;
const UniqueCursorPtr optional_cursor_;
// indicates if the next Pull from this cursor should
// perform a Pull from the input_cursor_
// this is true when:
// - first pulling from this Cursor
// - previous Pull from this cursor exhausted the optional_cursor_
bool pull_input_{true};
};
cpp<#)
(:serialize (:slk))
(:clone))