From af812d1311f75d936e1dd18c4dbec047ffc9c986 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Tue, 13 Dec 2022 09:05:39 +0100
Subject: [PATCH 01/13] Implement scanAll MultiFrame version

---
 src/query/v2/multiframe.hpp    |  5 +-
 src/query/v2/plan/operator.cpp | 85 ++++++++++++++++++++++++++++++----
 2 files changed, 77 insertions(+), 13 deletions(-)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index e13eb07ac..3e063bdde 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -201,10 +201,9 @@ class ValidFramesConsumer {
 
   ~ValidFramesConsumer() noexcept;
   ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
-  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
+  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
   ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
-  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
-
+  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;
     using difference_type = std::ptrdiff_t;
diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 09c0837c0..804312800 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -37,6 +37,7 @@
 #include "query/v2/db_accessor.hpp"
 #include "query/v2/exceptions.hpp"
 #include "query/v2/frontend/ast/ast.hpp"
+#include "query/v2/multiframe.hpp"
 #include "query/v2/path.hpp"
 #include "query/v2/plan/scoped_profile.hpp"
 #include "query/v2/request_router.hpp"
@@ -473,11 +474,11 @@ class DistributedScanAllAndFilterCursor : public Cursor {
       if (label_.has_value()) {
         request_label = request_router.LabelToName(*label_);
       }
-      current_batch = request_router.ScanVertices(request_label);
+      current_batch_ = request_router.ScanVertices(request_label);
     }
-    current_vertex_it = current_batch.begin();
+    current_vertex_it_ = current_batch_.begin();
     request_state_ = State::COMPLETED;
-    return !current_batch.empty();
+    return !current_batch_.empty();
   }
 
   bool Pull(Frame &frame, ExecutionContext &context) override {
@@ -495,23 +496,85 @@ class DistributedScanAllAndFilterCursor : public Cursor {
         }
       }
 
-      if (current_vertex_it == current_batch.end() &&
+      if (current_vertex_it_ == current_batch_.end() &&
           (request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
         ResetExecutionState();
         continue;
       }
 
-      frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
-      ++current_vertex_it;
+      frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
+      ++current_vertex_it_;
       return true;
     }
   }
 
+  void Generate(ExecutionContext &context) {
+    auto &request_router = *context.request_router;
+
+    input_cursor_->PullMultiple(*own_multi_frames_, context);
+
+    if (!MakeRequest(request_router, context)) {
+      return;
+    }
+
+    auto valid_frames_consumer = own_multi_frames_->GetValidFramesConsumer();
+    auto valid_frames_it = valid_frames_consumer.begin();
+
+    for (auto valid_frames_it = valid_frames_consumer.begin(); valid_frames_it != valid_frames_consumer.end();
+         ++valid_frames_it) {
+      for (auto vertex_it = current_batch_.begin(); vertex_it != current_batch_.end(); ++vertex_it) {
+        auto frame = *valid_frames_it;
+        frame[output_symbol_] = TypedValue(*vertex_it);
+        frames_buffer_.push(std::move(frame));
+      }
+      valid_frames_it->MakeInvalid();
+    }
+  }
+
+  void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
+    SCOPED_PROFILE_OP(op_name_);
+
+    if (!own_multi_frames_.has_value()) {
+      own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
+                                           kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
+    }
+
+    auto &request_router = *context.request_router;
+    auto should_make_request = false;
+    auto should_pull = false;
+    auto should_generate = false;
+
+    while (true) {
+      if (MustAbort(context)) {
+        throw HintedAbortError();
+      }
+
+      const auto should_generate = frames_buffer_.empty();
+      if (should_generate) {
+        Generate(context);
+      }
+
+      auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
+      auto invalid_frame_it = invalid_frames_populator.begin();
+      auto has_modified_at_least_one_frame = false;
+      while (invalid_frames_populator.end() != invalid_frame_it && !frames_buffer_.empty()) {
+        has_modified_at_least_one_frame = true;
+        *invalid_frame_it = frames_buffer_.front();
+        ++invalid_frame_it;
+        frames_buffer_.pop();
+      }
+
+      if (!has_modified_at_least_one_frame) {
+        return;
+      }
+    }
+  };
+
   void Shutdown() override { input_cursor_->Shutdown(); }
 
   void ResetExecutionState() {
-    current_batch.clear();
-    current_vertex_it = current_batch.end();
+    current_batch_.clear();
+    current_vertex_it_ = current_batch_.end();
     request_state_ = State::INITIALIZING;
   }
 
@@ -524,12 +587,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
   const Symbol output_symbol_;
   const UniqueCursorPtr input_cursor_;
   const char *op_name_;
-  std::vector<VertexAccessor> current_batch;
-  std::vector<VertexAccessor>::iterator current_vertex_it;
+  std::vector<VertexAccessor> current_batch_;
+  std::vector<VertexAccessor>::iterator current_vertex_it_;
   State request_state_ = State::INITIALIZING;
   std::optional<storage::v3::LabelId> label_;
   std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
   std::optional<std::vector<Expression *>> filter_expressions_;
+  std::optional<MultiFrame> own_multi_frames_;
+  std::queue<FrameWithValidity> frames_buffer_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)

From ac16348fff3d5f567d90faae944540b02b2821f4 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Tue, 13 Dec 2022 09:50:42 +0100
Subject: [PATCH 02/13] Remove unused variable

---
 src/query/v2/plan/operator.cpp | 6 ------
 1 file changed, 6 deletions(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 804312800..1569840c4 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -518,7 +518,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
     }
 
     auto valid_frames_consumer = own_multi_frames_->GetValidFramesConsumer();
-    auto valid_frames_it = valid_frames_consumer.begin();
 
     for (auto valid_frames_it = valid_frames_consumer.begin(); valid_frames_it != valid_frames_consumer.end();
          ++valid_frames_it) {
@@ -539,11 +538,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
                                            kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
     }
 
-    auto &request_router = *context.request_router;
-    auto should_make_request = false;
-    auto should_pull = false;
-    auto should_generate = false;
-
     while (true) {
       if (MustAbort(context)) {
         throw HintedAbortError();

From 83306d21defd278ca8f0bba7edc71b685338e832 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Tue, 13 Dec 2022 09:50:50 +0100
Subject: [PATCH 03/13] Revert changes

---
 src/query/v2/multiframe.hpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index 3e063bdde..d61755857 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -201,9 +201,9 @@ class ValidFramesConsumer {
 
   ~ValidFramesConsumer() noexcept;
   ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
-  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
+  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
   ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
-  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
+  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;
     using difference_type = std::ptrdiff_t;

From 54ce79baa02b281c8ca96d41c2c69dc0ef54c0d2 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Tue, 13 Dec 2022 09:51:42 +0100
Subject: [PATCH 04/13] Add empty line

---
 src/query/v2/multiframe.hpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index d61755857..e13eb07ac 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -204,6 +204,7 @@ class ValidFramesConsumer {
   ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
   ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
   ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
+
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;
     using difference_type = std::ptrdiff_t;

From 311994a36ddda5ed7cddaff9fd364e3b96ba4f44 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Thu, 15 Dec 2022 14:31:43 +0100
Subject: [PATCH 05/13] Impl of version more memory friendly

---
 src/query/v2/multiframe.hpp    |  6 ++--
 src/query/v2/plan/operator.cpp | 54 +++++++++++++++++++++-------------
 2 files changed, 36 insertions(+), 24 deletions(-)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index e13eb07ac..aeacda7d3 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -200,9 +200,9 @@ class ValidFramesConsumer {
   explicit ValidFramesConsumer(MultiFrame &multiframe);
 
   ~ValidFramesConsumer() noexcept;
-  ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
-  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
-  ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
+  ValidFramesConsumer(const ValidFramesConsumer &other) = default;
+  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
+  ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = default;
   ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
 
   struct Iterator {
diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 1569840c4..3e8d533c5 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -508,26 +508,39 @@ class DistributedScanAllAndFilterCursor : public Cursor {
     }
   }
 
-  void Generate(ExecutionContext &context) {
+  void PrepareNextFrames(ExecutionContext &context) {
     auto &request_router = *context.request_router;
 
     input_cursor_->PullMultiple(*own_multi_frames_, context);
+    valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer();
+    valid_frames_it_ = valid_frames_consumer_->begin();
 
-    if (!MakeRequest(request_router, context)) {
-      return;
-    }
+    MakeRequest(request_router, context);
 
-    auto valid_frames_consumer = own_multi_frames_->GetValidFramesConsumer();
+    has_next_frame_ = current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
+  }
 
-    for (auto valid_frames_it = valid_frames_consumer.begin(); valid_frames_it != valid_frames_consumer.end();
-         ++valid_frames_it) {
-      for (auto vertex_it = current_batch_.begin(); vertex_it != current_batch_.end(); ++vertex_it) {
-        auto frame = *valid_frames_it;
-        frame[output_symbol_] = TypedValue(*vertex_it);
-        frames_buffer_.push(std::move(frame));
+  inline bool HasNextFrame() { return has_next_frame_; }
+
+  FrameWithValidity GetNextFrame(ExecutionContext &context) {
+    MG_ASSERT(HasNextFrame());
+
+    auto frame = *valid_frames_it_;
+    frame[output_symbol_] = TypedValue(*current_vertex_it_);
+
+    ++current_vertex_it_;
+    if (current_vertex_it_ == current_batch_.end()) {
+      valid_frames_it_->MakeInvalid();
+      ++valid_frames_it_;
+
+      if (valid_frames_it_ == valid_frames_consumer_->end()) {
+        PrepareNextFrames(context);
+      } else {
+        current_vertex_it_ = current_batch_.begin();
       }
-      valid_frames_it->MakeInvalid();
-    }
+    };
+
+    return frame;
   }
 
   void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
@@ -536,6 +549,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
     if (!own_multi_frames_.has_value()) {
       own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
                                            kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
+      PrepareNextFrames(context);
     }
 
     while (true) {
@@ -543,19 +557,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
         throw HintedAbortError();
       }
 
-      const auto should_generate = frames_buffer_.empty();
-      if (should_generate) {
-        Generate(context);
-      }
-
       auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
       auto invalid_frame_it = invalid_frames_populator.begin();
       auto has_modified_at_least_one_frame = false;
-      while (invalid_frames_populator.end() != invalid_frame_it && !frames_buffer_.empty()) {
+
+      while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) {
         has_modified_at_least_one_frame = true;
-        *invalid_frame_it = frames_buffer_.front();
+        *invalid_frame_it = GetNextFrame(context);
         ++invalid_frame_it;
-        frames_buffer_.pop();
       }
 
       if (!has_modified_at_least_one_frame) {
@@ -588,7 +597,10 @@ class DistributedScanAllAndFilterCursor : public Cursor {
   std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
   std::optional<std::vector<Expression *>> filter_expressions_;
   std::optional<MultiFrame> own_multi_frames_;
+  std::optional<ValidFramesConsumer> valid_frames_consumer_;
+  ValidFramesConsumer::Iterator valid_frames_it_;
   std::queue<FrameWithValidity> frames_buffer_;
+  bool has_next_frame_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)

From 1aa40e5e3fad1e9b4a482157988ec8bd355a3ed0 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Thu, 15 Dec 2022 16:24:45 +0100
Subject: [PATCH 06/13] Add const to method

---
 src/query/v2/plan/operator.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 3e8d533c5..b6bbf9ce1 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -520,7 +520,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
     has_next_frame_ = current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
   }
 
-  inline bool HasNextFrame() { return has_next_frame_; }
+  inline bool HasNextFrame() const { return has_next_frame_; }
 
   FrameWithValidity GetNextFrame(ExecutionContext &context) {
     MG_ASSERT(HasNextFrame());

From 751c27f792d91d6a58d7601334eb17ae33295061 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Tue, 20 Dec 2022 10:12:50 +0100
Subject: [PATCH 07/13] Get ride of attribute has_valid_frames_

---
 src/query/v2/plan/operator.cpp | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index b6bbf9ce1..f162ec9ea 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -516,11 +516,11 @@ class DistributedScanAllAndFilterCursor : public Cursor {
     valid_frames_it_ = valid_frames_consumer_->begin();
 
     MakeRequest(request_router, context);
-
-    has_next_frame_ = current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
   }
 
-  inline bool HasNextFrame() const { return has_next_frame_; }
+  inline bool HasNextFrame() {
+    return current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
+  }
 
   FrameWithValidity GetNextFrame(ExecutionContext &context) {
     MG_ASSERT(HasNextFrame());
@@ -600,7 +600,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
   std::optional<ValidFramesConsumer> valid_frames_consumer_;
   ValidFramesConsumer::Iterator valid_frames_it_;
   std::queue<FrameWithValidity> frames_buffer_;
-  bool has_next_frame_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)

From b91b16de963b3c7b2cd2202dc2b5602aa0d4e915 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <benjamin.antal@memgraph.io>
Date: Tue, 17 Jan 2023 07:06:25 +0100
Subject: [PATCH 08/13] Fix `Interpreter::PullMultiple` for queries that return
 some values

---
 src/query/v2/interpreter.cpp | 20 +++++++++++---------
 src/query/v2/multiframe.cpp  |  4 ----
 src/query/v2/multiframe.hpp  |  1 -
 3 files changed, 11 insertions(+), 14 deletions(-)

diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp
index 594942aec..23386bd1c 100644
--- a/src/query/v2/interpreter.cpp
+++ b/src/query/v2/interpreter.cpp
@@ -704,7 +704,6 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
   ctx_.request_router = request_router;
   ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
 }
-
 std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n,
                                                                         const std::vector<Symbol> &output_symbols,
                                                                         std::map<std::string, TypedValue> *summary) {
@@ -734,7 +733,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
   // Returns true if a result was pulled.
   const auto pull_result = [&]() -> bool {
     cursor_->PullMultiple(multi_frame_, ctx_);
-    return !multi_frame_.HasInvalidFrame();
+    return multi_frame_.HasValidFrame();
   };
 
   const auto stream_values = [&output_symbols, &stream](const Frame &frame) {
@@ -755,13 +754,14 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
   int i = 0;
   if (has_unsent_results_ && !output_symbols.empty()) {
     // stream unsent results from previous pull
-
-    auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
-    for (const auto &frame : iterator_for_valid_frame_only) {
+    for (auto &frame : multi_frame_.GetValidFramesConsumer()) {
       stream_values(frame);
+      frame.MakeInvalid();
       ++i;
+      if (i == n) {
+        break;
+      }
     }
-    multi_frame_.MakeAllFramesInvalid();
   }
 
   for (; !n || i < n;) {
@@ -770,13 +770,15 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
     }
 
     if (!output_symbols.empty()) {
-      auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
-      for (const auto &frame : iterator_for_valid_frame_only) {
+      for (auto &frame : multi_frame_.GetValidFramesConsumer()) {
         stream_values(frame);
+        frame.MakeInvalid();
         ++i;
+        if (i == n) {
+          break;
+        }
       }
     }
-    multi_frame_.MakeAllFramesInvalid();
   }
 
   // If we finished because we streamed the requested n results,
diff --git a/src/query/v2/multiframe.cpp b/src/query/v2/multiframe.cpp
index 0ddfd3aa7..835cdbc0f 100644
--- a/src/query/v2/multiframe.cpp
+++ b/src/query/v2/multiframe.cpp
@@ -48,10 +48,6 @@ bool MultiFrame::HasValidFrame() const noexcept {
   return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return frame.IsValid(); });
 }
 
-bool MultiFrame::HasInvalidFrame() const noexcept {
-  return std::any_of(frames_.rbegin(), frames_.rend(), [](auto &frame) { return !frame.IsValid(); });
-}
-
 // NOLINTNEXTLINE (bugprone-exception-escape)
 void MultiFrame::DefragmentValidFrames() noexcept {
   /*
diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index f84fe6421..5f821bb6b 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -81,7 +81,6 @@ class MultiFrame {
   void MakeAllFramesInvalid() noexcept;
 
   bool HasValidFrame() const noexcept;
-  bool HasInvalidFrame() const noexcept;
 
   inline utils::MemoryResource *GetMemoryResource() { return frames_[0].GetMemoryResource(); }
 

From 36891c119b28d472e4d831af2f318cc36c3fd670 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <benjamin.antal@memgraph.io>
Date: Tue, 17 Jan 2023 07:17:53 +0100
Subject: [PATCH 09/13] Remove unnecessary state from
 `DistributedScanAllAndFilterCursor`

---
 src/query/v2/plan/operator.cpp | 20 ++++----------------
 1 file changed, 4 insertions(+), 16 deletions(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index a57799367..1e84eaae5 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -467,31 +467,24 @@ class DistributedScanAllAndFilterCursor : public Cursor {
       current_batch_ = request_router.ScanVertices(request_label);
     }
     current_vertex_it_ = current_batch_.begin();
-    request_state_ = State::COMPLETED;
     return !current_batch_.empty();
   }
 
   bool Pull(Frame &frame, ExecutionContext &context) override {
     SCOPED_PROFILE_OP(op_name_);
 
-    auto &request_router = *context.request_router;
     while (true) {
       if (MustAbort(context)) {
         throw HintedAbortError();
       }
 
-      if (request_state_ == State::INITIALIZING) {
-        if (!input_cursor_->Pull(frame, context)) {
+      if (current_vertex_it_ == current_batch_.end()) {
+        ResetExecutionState();
+        if (!input_cursor_->Pull(frame, context) || !MakeRequest(*context.request_router, context)) {
           return false;
         }
       }
 
-      if (current_vertex_it_ == current_batch_.end() &&
-          (request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
-        ResetExecutionState();
-        continue;
-      }
-
       frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
       ++current_vertex_it_;
       return true;
@@ -568,7 +561,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
   void ResetExecutionState() {
     current_batch_.clear();
     current_vertex_it_ = current_batch_.end();
-    request_state_ = State::INITIALIZING;
   }
 
   void Reset() override {
@@ -581,15 +573,13 @@ class DistributedScanAllAndFilterCursor : public Cursor {
   const UniqueCursorPtr input_cursor_;
   const char *op_name_;
   std::vector<VertexAccessor> current_batch_;
-  std::vector<VertexAccessor>::iterator current_vertex_it_;
-  State request_state_ = State::INITIALIZING;
+  std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
   std::optional<storage::v3::LabelId> label_;
   std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
   std::optional<std::vector<Expression *>> filter_expressions_;
   std::optional<MultiFrame> own_multi_frames_;
   std::optional<ValidFramesConsumer> valid_frames_consumer_;
   ValidFramesConsumer::Iterator valid_frames_it_;
-  std::queue<FrameWithValidity> frames_buffer_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
@@ -597,8 +587,6 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy
 
 ACCEPT_WITH_INPUT(ScanAll)
 
-class DistributedScanAllCursor;
-
 UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
   EventCounter::IncrementCounter(EventCounter::ScanAllOperator);
 

From d11d5c3fa93c02383566f712775c669861e32ae6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <benjamin.antal@memgraph.io>
Date: Tue, 17 Jan 2023 08:33:40 +0100
Subject: [PATCH 10/13] Make special member functions of `MultiFrame` iterators
 consistent

---
 src/query/v2/multiframe.hpp | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index 5f821bb6b..28009eb5d 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -96,9 +96,9 @@ class ValidFramesReader {
 
   ~ValidFramesReader() = default;
   ValidFramesReader(const ValidFramesReader &other) = delete;
-  ValidFramesReader(ValidFramesReader &&other) noexcept = delete;
+  ValidFramesReader(ValidFramesReader &&other) noexcept = default;
   ValidFramesReader &operator=(const ValidFramesReader &other) = delete;
-  ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = delete;
+  ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = default;
 
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;
@@ -146,9 +146,9 @@ class ValidFramesModifier {
 
   ~ValidFramesModifier() = default;
   ValidFramesModifier(const ValidFramesModifier &other) = delete;
-  ValidFramesModifier(ValidFramesModifier &&other) noexcept = delete;
+  ValidFramesModifier(ValidFramesModifier &&other) noexcept = default;
   ValidFramesModifier &operator=(const ValidFramesModifier &other) = delete;
-  ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = delete;
+  ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = default;
 
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;
@@ -200,10 +200,10 @@ class ValidFramesConsumer {
   explicit ValidFramesConsumer(MultiFrame &multiframe);
 
   ~ValidFramesConsumer() noexcept;
-  ValidFramesConsumer(const ValidFramesConsumer &other) = default;
+  ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
   ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
-  ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = default;
-  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
+  ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
+  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
 
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;

From 57690c5390b0ba6e59ffe9410a0b95c07681239c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <benjamin.antal@memgraph.io>
Date: Tue, 17 Jan 2023 08:34:08 +0100
Subject: [PATCH 11/13] Refactor `DistributedScanAllAndFilterCursor`

---
 src/query/v2/plan/operator.cpp | 81 +++++++++++++++-------------------
 1 file changed, 36 insertions(+), 45 deletions(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 1e84eaae5..9841791e6 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -457,14 +457,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
 
   using VertexAccessor = accessors::VertexAccessor;
 
-  bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) {
+  bool MakeRequest(ExecutionContext &context) {
     {
       SCOPED_REQUEST_WAIT_PROFILE;
       std::optional<std::string> request_label = std::nullopt;
       if (label_.has_value()) {
-        request_label = request_router.LabelToName(*label_);
+        request_label = context.request_router->LabelToName(*label_);
       }
-      current_batch_ = request_router.ScanVertices(request_label);
+      current_batch_ = context.request_router->ScanVertices(request_label);
     }
     current_vertex_it_ = current_batch_.begin();
     return !current_batch_.empty();
@@ -480,7 +480,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
 
       if (current_vertex_it_ == current_batch_.end()) {
         ResetExecutionState();
-        if (!input_cursor_->Pull(frame, context) || !MakeRequest(*context.request_router, context)) {
+        if (!input_cursor_->Pull(frame, context) || !MakeRequest(context)) {
           return false;
         }
       }
@@ -491,66 +491,57 @@ class DistributedScanAllAndFilterCursor : public Cursor {
     }
   }
 
-  void PrepareNextFrames(ExecutionContext &context) {
-    auto &request_router = *context.request_router;
-
-    input_cursor_->PullMultiple(*own_multi_frames_, context);
-    valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer();
-    valid_frames_it_ = valid_frames_consumer_->begin();
-
-    MakeRequest(request_router, context);
+  bool PullNextFrames(ExecutionContext &context) {
+    input_cursor_->PullMultiple(*own_multi_frame_, context);
+    own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
+    own_frames_it_ = own_frames_consumer_->begin();
+    return own_multi_frame_->HasValidFrame();
   }
 
-  inline bool HasNextFrame() {
-    return current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
+  inline bool HasMoreResult() {
+    return current_vertex_it_ != current_batch_.end() && own_frames_it_ != own_frames_consumer_->end();
   }
 
-  FrameWithValidity GetNextFrame(ExecutionContext &context) {
-    MG_ASSERT(HasNextFrame());
+  bool PopulateFrame(ExecutionContext &context, FrameWithValidity &frame) {
+    MG_ASSERT(HasMoreResult());
 
-    auto frame = *valid_frames_it_;
+    frame = *own_frames_it_;
     frame[output_symbol_] = TypedValue(*current_vertex_it_);
 
     ++current_vertex_it_;
     if (current_vertex_it_ == current_batch_.end()) {
-      valid_frames_it_->MakeInvalid();
-      ++valid_frames_it_;
+      own_frames_it_->MakeInvalid();
+      ++own_frames_it_;
 
-      if (valid_frames_it_ == valid_frames_consumer_->end()) {
-        PrepareNextFrames(context);
-      } else {
-        current_vertex_it_ = current_batch_.begin();
+      current_vertex_it_ = current_batch_.begin();
+
+      if (own_frames_it_ == own_frames_consumer_->end()) {
+        return PullNextFrames(context);
       }
     };
-
-    return frame;
+    return true;
   }
 
   void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
     SCOPED_PROFILE_OP(op_name_);
 
-    if (!own_multi_frames_.has_value()) {
-      own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
-                                           kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
-      PrepareNextFrames(context);
+    if (!own_multi_frame_.has_value()) {
+      own_multi_frame_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe,
+                                          input_multi_frame.GetMemoryResource()));
+
+      MakeRequest(context);
+      PullNextFrames(context);
     }
 
-    while (true) {
+    if (!HasMoreResult()) {
+      return;
+    }
+
+    for (auto &frame : input_multi_frame.GetInvalidFramesPopulator()) {
       if (MustAbort(context)) {
         throw HintedAbortError();
       }
-
-      auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
-      auto invalid_frame_it = invalid_frames_populator.begin();
-      auto has_modified_at_least_one_frame = false;
-
-      while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) {
-        has_modified_at_least_one_frame = true;
-        *invalid_frame_it = GetNextFrame(context);
-        ++invalid_frame_it;
-      }
-
-      if (!has_modified_at_least_one_frame) {
+      if (!PopulateFrame(context, frame)) {
         return;
       }
     }
@@ -577,9 +568,9 @@ class DistributedScanAllAndFilterCursor : public Cursor {
   std::optional<storage::v3::LabelId> label_;
   std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
   std::optional<std::vector<Expression *>> filter_expressions_;
-  std::optional<MultiFrame> own_multi_frames_;
-  std::optional<ValidFramesConsumer> valid_frames_consumer_;
-  ValidFramesConsumer::Iterator valid_frames_it_;
+  std::optional<MultiFrame> own_multi_frame_;
+  std::optional<ValidFramesConsumer> own_frames_consumer_;
+  ValidFramesConsumer::Iterator own_frames_it_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)

From 901da4c9b3ad3636aebb3bf4f0210f739ce35d60 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <benjamin.antal@memgraph.io>
Date: Tue, 17 Jan 2023 21:01:22 +0100
Subject: [PATCH 12/13] Update `InvalidFramesPopulator` to follow the
 conventions

---
 src/query/v2/multiframe.hpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index 28009eb5d..e286e00a3 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -255,9 +255,9 @@ class InvalidFramesPopulator {
   ~InvalidFramesPopulator() = default;
 
   InvalidFramesPopulator(const InvalidFramesPopulator &other) = delete;
-  InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = delete;
+  InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = default;
   InvalidFramesPopulator &operator=(const InvalidFramesPopulator &other) = delete;
-  InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = delete;
+  InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = default;
 
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;

From 575361827e9b3a1e9772deb76b02530ccf2dea23 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <benjamin.antal@memgraph.io>
Date: Tue, 17 Jan 2023 21:01:54 +0100
Subject: [PATCH 13/13] Add comment about invalid usage of `MutliFrame`

---
 src/query/v2/multiframe.hpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index e286e00a3..f464343b4 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -33,6 +33,7 @@ class MultiFrame {
   MultiFrame(size_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory);
   ~MultiFrame() = default;
 
+  // Assigning and moving the MultiFrame is not allowed if any accessor from the above ones are alive.
   MultiFrame(const MultiFrame &other);
   MultiFrame(MultiFrame &&other) noexcept;
   MultiFrame &operator=(const MultiFrame &other) = delete;