From e888464de212a50cf90a28b52b777d8ca5a59a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Wed, 18 Jan 2023 17:32:22 +0100 Subject: [PATCH] Implement automaton for `ExpandOneCursor` --- src/query/v2/multiframe.cpp | 4 ++ src/query/v2/multiframe.hpp | 1 + src/query/v2/plan/operator.cpp | 109 ++++++++++++++++++++------------- 3 files changed, 71 insertions(+), 43 deletions(-) diff --git a/src/query/v2/multiframe.cpp b/src/query/v2/multiframe.cpp index 835cdbc0f..14841d2c4 100644 --- a/src/query/v2/multiframe.cpp +++ b/src/query/v2/multiframe.cpp @@ -48,6 +48,10 @@ bool MultiFrame::HasValidFrame() const noexcept { return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return frame.IsValid(); }); } +bool MultiFrame::HasInvalidFrame() const noexcept { + return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return !frame.IsValid(); }); +} + // NOLINTNEXTLINE (bugprone-exception-escape) void MultiFrame::DefragmentValidFrames() noexcept { /* diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp index f464343b4..6958ffbe8 100644 --- a/src/query/v2/multiframe.hpp +++ b/src/query/v2/multiframe.hpp @@ -82,6 +82,7 @@ class MultiFrame { void MakeAllFramesInvalid() noexcept; bool HasValidFrame() const noexcept; + bool HasInvalidFrame() const noexcept; inline utils::MemoryResource *GetMemoryResource() { return frames_[0].GetMemoryResource(); } diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 900ec15a0..eba45b0b7 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -2827,9 +2827,15 @@ class DistributedExpandCursor : public Cursor { vertex_id_to_result_row.erase(vertex.Id()); } - void PullEdgesFromStorage(ExecutionContext &context) { - // Input Vertex could be null if it is created by a failed optional match. In - // those cases we skip that input pull and continue with the next. + bool PullInputFrames(ExecutionContext &context) { + input_cursor_->PullMultiple(*own_multi_frame_, context); + // These needs to be updated regardless of the result of the pull, otherwise the consumer and iterator might + // get corrupted because of the operations done on our MultiFrame. + own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer(); + own_frames_it_ = own_frames_consumer_->begin(); + if (!own_multi_frame_->HasValidFrame()) { + return false; + } msgs::ExpandOneRequest request; request.direction = DirectionToMsgsDirection(self_.common_.direction); @@ -2842,7 +2848,7 @@ class DistributedExpandCursor : public Cursor { MG_ASSERT(!vertex_value.IsNull()); ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); - auto &vertex = vertex_value.ValueVertex(); + const auto &vertex = vertex_value.ValueVertex(); request.src_vertices.push_back(vertex.Id()); } @@ -2854,6 +2860,8 @@ class DistributedExpandCursor : public Cursor { for (auto &row : result_rows_) { vertex_id_to_result_row[row.src_vertex.id] = &row; } + + return true; } void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { @@ -2862,49 +2870,55 @@ class DistributedExpandCursor : public Cursor { EnsureOwnMultiFrameIsGood(output_multi_frame); // A helper function for expanding a node from an edge. + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + while (true) { - if (MustAbort(context)) throw HintedAbortError(); - if (own_frames_it_ == own_frames_consumer_->end()) { - input_cursor_->PullMultiple(*own_multi_frame_, context); - own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer(); - own_frames_it_ = own_frames_consumer_->begin(); - if (!own_multi_frame_->HasValidFrame()) { + switch (state_) { + case State::PullInputAndEdges: { + if (!PullInputFrames(context)) { + state_ = State::Exhausted; + return; + } + state_ = State::InitInOutEdgesIt; break; } - - PullEdgesFromStorage(context); - InitEdgesMultiple(context); - } - - while (own_frames_it_ != own_frames_consumer_->end()) { - if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) { - own_frames_it_->MakeInvalid(); - ++own_frames_it_; - - InitEdgesMultiple(context); - } - - auto &input_frame = *own_frames_it_; - - auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); - - auto populate_edges = [this, &context, &output_frames_populator, &input_frame]( - std::vector::iterator ¤t, - const std::vector::iterator &end) { - for (auto output_frame_it = output_frames_populator.begin(); - output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) { - auto &edge = *current; - ++current; - auto &output_frame = *output_frame_it++; - output_frame = input_frame; - output_frame[self_.common_.edge_symbol] = edge; - PullDstVertex(output_frame, context, EdgeAtom::Direction::IN); + case State::InitInOutEdgesIt: { + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInputAndEdges; + } else { + InitEdges(*own_frames_it_, context); + state_ = State::PopulateOutput; } - }; - populate_edges(current_in_edge_it_, current_in_edges_.end()); - populate_edges(current_out_edge_it_, current_out_edges_.end()); - - if (output_frames_populator.begin() == output_frames_populator.end()) { + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return; + } + if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + state_ = State::InitInOutEdgesIt; + continue; + } + auto populate_edges = [this, &context, &output_frames_populator]( + const EdgeAtom::Direction direction, std::vector::iterator ¤t, + const std::vector::iterator &end) { + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) { + auto &edge = *current; + ++current; + auto &output_frame = *output_frame_it++; + output_frame = *own_frames_it_; + output_frame[self_.common_.edge_symbol] = edge; + PullDstVertex(output_frame, context, direction); + } + }; + populate_edges(EdgeAtom::Direction::IN, current_in_edge_it_, current_in_edges_.end()); + populate_edges(EdgeAtom::Direction::OUT, current_out_edge_it_, current_out_edges_.end()); + break; + } + case State::Exhausted: { return; } } @@ -2925,6 +2939,12 @@ class DistributedExpandCursor : public Cursor { void Reset() override { input_cursor_->Reset(); + vertex_id_to_result_row.clear(); + result_rows_.clear(); + own_frames_it_ = ValidFramesConsumer::Iterator{}; + own_frames_consumer_.reset(); + own_multi_frame_->MakeAllFramesInvalid(); + state_ = State::PullInputAndEdges; current_in_edges_.clear(); current_out_edges_.clear(); current_in_edge_it_ = current_in_edges_.end(); @@ -2932,12 +2952,15 @@ class DistributedExpandCursor : public Cursor { } private: + enum class State { PullInputAndEdges, InitInOutEdgesIt, PopulateOutput, Exhausted }; + const Expand &self_; const UniqueCursorPtr input_cursor_; std::vector current_in_edges_; std::vector current_out_edges_; std::vector::iterator current_in_edge_it_; std::vector::iterator current_out_edge_it_; + State state_{State::PullInputAndEdges}; std::optional own_multi_frame_; std::optional own_frames_consumer_; ValidFramesConsumer::Iterator own_frames_it_;