From af812d1311f75d936e1dd18c4dbec047ffc9c986 Mon Sep 17 00:00:00 2001
From: jeremy <jeremy.bailleux@memgraph.io>
Date: Tue, 13 Dec 2022 09:05:39 +0100
Subject: [PATCH] Implement scanAll MultiFrame version

---
 src/query/v2/multiframe.hpp    |  5 +-
 src/query/v2/plan/operator.cpp | 85 ++++++++++++++++++++++++++++++----
 2 files changed, 77 insertions(+), 13 deletions(-)

diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp
index e13eb07ac..3e063bdde 100644
--- a/src/query/v2/multiframe.hpp
+++ b/src/query/v2/multiframe.hpp
@@ -201,10 +201,9 @@ class ValidFramesConsumer {
 
   ~ValidFramesConsumer() noexcept;
   ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
-  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
+  ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
   ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
-  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
-
+  ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
   struct Iterator {
     using iterator_category = std::forward_iterator_tag;
     using difference_type = std::ptrdiff_t;
diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 09c0837c0..804312800 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -37,6 +37,7 @@
 #include "query/v2/db_accessor.hpp"
 #include "query/v2/exceptions.hpp"
 #include "query/v2/frontend/ast/ast.hpp"
+#include "query/v2/multiframe.hpp"
 #include "query/v2/path.hpp"
 #include "query/v2/plan/scoped_profile.hpp"
 #include "query/v2/request_router.hpp"
@@ -473,11 +474,11 @@ class DistributedScanAllAndFilterCursor : public Cursor {
       if (label_.has_value()) {
         request_label = request_router.LabelToName(*label_);
       }
-      current_batch = request_router.ScanVertices(request_label);
+      current_batch_ = request_router.ScanVertices(request_label);
     }
-    current_vertex_it = current_batch.begin();
+    current_vertex_it_ = current_batch_.begin();
     request_state_ = State::COMPLETED;
-    return !current_batch.empty();
+    return !current_batch_.empty();
   }
 
   bool Pull(Frame &frame, ExecutionContext &context) override {
@@ -495,23 +496,85 @@ class DistributedScanAllAndFilterCursor : public Cursor {
         }
       }
 
-      if (current_vertex_it == current_batch.end() &&
+      if (current_vertex_it_ == current_batch_.end() &&
           (request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
         ResetExecutionState();
         continue;
       }
 
-      frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
-      ++current_vertex_it;
+      frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
+      ++current_vertex_it_;
       return true;
     }
   }
 
+  void Generate(ExecutionContext &context) {
+    auto &request_router = *context.request_router;
+
+    input_cursor_->PullMultiple(*own_multi_frames_, context);
+
+    if (!MakeRequest(request_router, context)) {
+      return;
+    }
+
+    auto valid_frames_consumer = own_multi_frames_->GetValidFramesConsumer();
+    auto valid_frames_it = valid_frames_consumer.begin();
+
+    for (auto valid_frames_it = valid_frames_consumer.begin(); valid_frames_it != valid_frames_consumer.end();
+         ++valid_frames_it) {
+      for (auto vertex_it = current_batch_.begin(); vertex_it != current_batch_.end(); ++vertex_it) {
+        auto frame = *valid_frames_it;
+        frame[output_symbol_] = TypedValue(*vertex_it);
+        frames_buffer_.push(std::move(frame));
+      }
+      valid_frames_it->MakeInvalid();
+    }
+  }
+
+  void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
+    SCOPED_PROFILE_OP(op_name_);
+
+    if (!own_multi_frames_.has_value()) {
+      own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
+                                           kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
+    }
+
+    auto &request_router = *context.request_router;
+    auto should_make_request = false;
+    auto should_pull = false;
+    auto should_generate = false;
+
+    while (true) {
+      if (MustAbort(context)) {
+        throw HintedAbortError();
+      }
+
+      const auto should_generate = frames_buffer_.empty();
+      if (should_generate) {
+        Generate(context);
+      }
+
+      auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
+      auto invalid_frame_it = invalid_frames_populator.begin();
+      auto has_modified_at_least_one_frame = false;
+      while (invalid_frames_populator.end() != invalid_frame_it && !frames_buffer_.empty()) {
+        has_modified_at_least_one_frame = true;
+        *invalid_frame_it = frames_buffer_.front();
+        ++invalid_frame_it;
+        frames_buffer_.pop();
+      }
+
+      if (!has_modified_at_least_one_frame) {
+        return;
+      }
+    }
+  };
+
   void Shutdown() override { input_cursor_->Shutdown(); }
 
   void ResetExecutionState() {
-    current_batch.clear();
-    current_vertex_it = current_batch.end();
+    current_batch_.clear();
+    current_vertex_it_ = current_batch_.end();
     request_state_ = State::INITIALIZING;
   }
 
@@ -524,12 +587,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
   const Symbol output_symbol_;
   const UniqueCursorPtr input_cursor_;
   const char *op_name_;
-  std::vector<VertexAccessor> current_batch;
-  std::vector<VertexAccessor>::iterator current_vertex_it;
+  std::vector<VertexAccessor> current_batch_;
+  std::vector<VertexAccessor>::iterator current_vertex_it_;
   State request_state_ = State::INITIALIZING;
   std::optional<storage::v3::LabelId> label_;
   std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
   std::optional<std::vector<Expression *>> filter_expressions_;
+  std::optional<MultiFrame> own_multi_frames_;
+  std::queue<FrameWithValidity> frames_buffer_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)