Full implementation with nested state machines
This commit is contained in:
parent
85482dfaf2
commit
83fca6b022
@ -2177,89 +2177,62 @@ class OptionalCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("OptionalMF");
|
||||
|
||||
EnsureOwnMultiFramesAreGood(output_multi_frame);
|
||||
auto populated_any{false};
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
auto output_frames_it = output_frames_populator.begin();
|
||||
|
||||
bool HandleReadyInput(InvalidFramesPopulator &output_populator, InvalidFramesPopulator::Iterator &output_frames_it,
|
||||
ExecutionContext &context) {
|
||||
bool populated_any = false;
|
||||
while (true) {
|
||||
if (output_frames_it == output_frames_populator.end()) {
|
||||
return populated_any;
|
||||
}
|
||||
switch (state_) {
|
||||
case State::PullInput: {
|
||||
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
|
||||
state_ = State::Exhausted;
|
||||
break;
|
||||
}
|
||||
uint64_t frame_id{0U};
|
||||
for (auto &frame : own_multi_frame_->GetValidFramesModifier()) {
|
||||
frame.SetId(frame_id++);
|
||||
}
|
||||
last_matched_frame_ = 0U;
|
||||
optional_cursor_->Reset();
|
||||
optional_cursor_->PushDown(*own_multi_frame_);
|
||||
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
state_ = State::PullOptional;
|
||||
break;
|
||||
}
|
||||
case State::PullOptional: {
|
||||
switch (optional_state_) {
|
||||
case State::Pull: {
|
||||
if (!optional_cursor_->PullMultiple(*optional_multi_frame_, context)) {
|
||||
state_ = State::OptionalExhausted;
|
||||
optional_state_ = State::Exhausted;
|
||||
optional_frames_consumer_.reset();
|
||||
optional_frames_it_ = {};
|
||||
if (populated_any) {
|
||||
++own_frames_it_;
|
||||
}
|
||||
} else {
|
||||
optional_frames_consumer_ = optional_multi_frame_->GetValidFramesConsumer();
|
||||
optional_frames_it_ = optional_frames_consumer_->begin();
|
||||
state_ = State::Populate;
|
||||
optional_state_ = State::Ready;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::Populate: {
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
state_ = State::PullInput;
|
||||
continue;
|
||||
}
|
||||
while (own_frames_it_ != own_frames_consumer_->end() && output_frames_it != output_frames_populator.end()) {
|
||||
if (optional_frames_consumer_.has_value() && optional_frames_it_ != optional_frames_consumer_->end() &&
|
||||
optional_frames_it_->Id() == own_frames_it_->Id()) {
|
||||
case State::Ready: {
|
||||
while (optional_frames_it_ != optional_frames_consumer_->end()) {
|
||||
if (output_frames_it == output_populator.end()) {
|
||||
return populated_any;
|
||||
}
|
||||
populated_any = true;
|
||||
if (optional_frames_it_->Id() == own_frames_it_->Id()) {
|
||||
// This might be a move, but then in we have to have special logic is EnsureOwnMultiFramesAreGood
|
||||
*output_frames_it = *optional_frames_it_;
|
||||
last_matched_frame_ = optional_frames_it_->Id();
|
||||
optional_frames_it_->MakeInvalid();
|
||||
++optional_frames_it_;
|
||||
++output_frames_it;
|
||||
if (optional_frames_it_ == optional_frames_consumer_->end()) {
|
||||
++own_frames_it_;
|
||||
state_ = State::PullOptional;
|
||||
populated_any = true;
|
||||
break;
|
||||
}
|
||||
if (optional_frames_it_->Id() != own_frames_it_->Id()) {
|
||||
++own_frames_it_;
|
||||
optional_state_ = State::Pull;
|
||||
}
|
||||
} else if (last_matched_frame_ == own_frames_it_->Id()) {
|
||||
++own_frames_it_;
|
||||
} else {
|
||||
// TODO(antaljanosbenjamin): Remove (or improve the message of) this assert
|
||||
MG_ASSERT(!optional_frames_consumer_.has_value() ||
|
||||
optional_frames_it_ == optional_frames_consumer_->end() ||
|
||||
optional_frames_it_->Id() > own_frames_it_->Id(),
|
||||
"This should be the case DELETE ME");
|
||||
MG_ASSERT(optional_frames_it_->Id() > own_frames_it_->Id(), "This should be the case DELETE ME");
|
||||
for (const auto &symbol : self_.optional_symbols_) {
|
||||
spdlog::error("{}", symbol.name());
|
||||
(*own_frames_it_)[symbol] = TypedValue(context.evaluation_context.memory);
|
||||
}
|
||||
// This might be a move, but then in we have to have special logic is EnsureOwnMultiFramesAreGood
|
||||
*output_frames_it = *own_frames_it_;
|
||||
last_matched_frame_ = own_frames_it_->Id();
|
||||
own_frames_it_->MakeInvalid();
|
||||
++own_frames_it_;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::OptionalExhausted: {
|
||||
while (own_frames_it_ != own_frames_consumer_->end() && output_frames_it != output_frames_populator.end()) {
|
||||
case State::Exhausted: {
|
||||
while (own_frames_it_ != own_frames_consumer_->end() && output_frames_it != output_populator.end()) {
|
||||
MG_ASSERT(!optional_frames_consumer_.has_value(), "This should be the case DELETE ME");
|
||||
for (const auto &symbol : self_.optional_symbols_) {
|
||||
spdlog::error("{}", symbol.name());
|
||||
@ -2270,14 +2243,58 @@ class OptionalCursor : public Cursor {
|
||||
++own_frames_it_;
|
||||
|
||||
populated_any = true;
|
||||
own_frames_it_->MakeInvalid();
|
||||
++output_frames_it;
|
||||
}
|
||||
return populated_any;
|
||||
}
|
||||
}
|
||||
}
|
||||
return populated_any;
|
||||
}
|
||||
|
||||
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("OptionalMF");
|
||||
|
||||
EnsureOwnMultiFramesAreGood(output_multi_frame);
|
||||
auto populated_any{false};
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
auto output_frames_it = output_frames_populator.begin();
|
||||
while (true) {
|
||||
switch (input_state_) {
|
||||
case State::Pull: {
|
||||
MG_ASSERT(optional_state_ != State::Ready, "Unexpected state");
|
||||
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
|
||||
input_state_ = State::Exhausted;
|
||||
optional_state_ = State::Exhausted;
|
||||
} else {
|
||||
input_state_ = State::Ready;
|
||||
optional_state_ = State::Pull;
|
||||
uint64_t frame_id{0U};
|
||||
for (auto &frame : own_multi_frame_->GetValidFramesModifier()) {
|
||||
frame.SetId(frame_id++);
|
||||
}
|
||||
last_matched_frame_ = 0U;
|
||||
optional_cursor_->Reset();
|
||||
optional_cursor_->PushDown(*own_multi_frame_);
|
||||
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::Ready: {
|
||||
populated_any |= HandleReadyInput(output_frames_populator, output_frames_it, context);
|
||||
if (output_frames_it == output_frames_populator.end()) {
|
||||
return populated_any;
|
||||
}
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
state_ = State::PullInput;
|
||||
input_state_ = State::Pull;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::Exhausted: {
|
||||
MG_ASSERT(optional_state_ == State::Exhausted);
|
||||
return populated_any;
|
||||
}
|
||||
}
|
||||
@ -2297,7 +2314,7 @@ class OptionalCursor : public Cursor {
|
||||
}
|
||||
|
||||
private:
|
||||
enum class State { PullInput, PullOptional, Populate, OptionalExhausted, Exhausted };
|
||||
enum class State { Pull, Ready, Exhausted };
|
||||
|
||||
void EnsureOwnMultiFramesAreGood(MultiFrame &output_multi_frame) {
|
||||
if (!own_multi_frame_.has_value()) {
|
||||
@ -2315,7 +2332,8 @@ class OptionalCursor : public Cursor {
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
const UniqueCursorPtr optional_cursor_;
|
||||
|
||||
State state_{State::PullInput};
|
||||
State input_state_{State::Pull};
|
||||
State optional_state_{State::Pull};
|
||||
std::optional<MultiFrame> own_multi_frame_;
|
||||
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator own_frames_it_;
|
||||
@ -3382,7 +3400,8 @@ class DistributedExpandCursor : public Cursor {
|
||||
void InitEdgesMultiple() {
|
||||
// This function won't work if any vertex id is duplicated in the input, because:
|
||||
// 1. vertex_id_to_result_row is not a multimap
|
||||
// 2. if self_.common_.existing_node is true, then we erase edges that might be necessary for the input vertex on a
|
||||
// 2. if self_.common_.existing_node is true, then we erase edges that might be necessary for the input
|
||||
// vertex on a
|
||||
// later frame
|
||||
const auto &frame = (*own_frames_it_);
|
||||
const auto &vertex_value = frame[self_.input_symbol_];
|
||||
|
Loading…
Reference in New Issue
Block a user