From 436e41f71ff5f5bb5ce6ca635d1251500641b466 Mon Sep 17 00:00:00 2001
From: gvolfing <gabor.volfinger@memgraph.io>
Date: Mon, 30 Jan 2023 13:06:05 +0100
Subject: [PATCH 1/6] Init POC of ScanByPrimaryKey multiframe

---
 src/query/v2/interpreter.cpp   |   3 +-
 src/query/v2/plan/operator.cpp | 157 ++++++++++++++++++++++++++++++++-
 2 files changed, 157 insertions(+), 3 deletions(-)

diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp
index aa220d764..fe7a17bf7 100644
--- a/src/query/v2/interpreter.cpp
+++ b/src/query/v2/interpreter.cpp
@@ -812,7 +812,8 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
 std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
                                                                 const std::vector<Symbol> &output_symbols,
                                                                 std::map<std::string, TypedValue> *summary) {
-  auto should_pull_multiple = false;  // TODO on the long term, we will only use PullMultiple
+  // auto should_pull_multiple = false;  // TODO on the long term, we will only use PullMultiple
+  auto should_pull_multiple = true;
   if (should_pull_multiple) {
     return PullMultiple(stream, n, output_symbols, summary);
   }
diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 50e5d4fca..c9bd2e9d0 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -603,8 +603,6 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
         filter_expressions_(filter_expressions),
         primary_key_(primary_key) {}
 
-  enum class State : int8_t { INITIALIZING, COMPLETED };
-
   using VertexAccessor = accessors::VertexAccessor;
 
   std::optional<VertexAccessor> MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router,
@@ -637,6 +635,83 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
     return VertexAccessor(vertex, properties, &request_router);
   }
 
+  // TODO (gvolfing) optinal vs empty vector for signaling failure?
+  bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) {
+    // Evaluate the expressions that hold the PrimaryKey.
+    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
+                                  storage::v3::View::NEW);
+
+    std::vector<msgs::Value> pk;
+    for (auto *primary_property : primary_key_) {
+      pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
+    }
+
+    msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
+
+    msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
+    auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
+      SCOPED_REQUEST_WAIT_PROFILE;
+      return request_router.GetProperties(req);
+    });
+    MG_ASSERT(get_prop_result.size() <= 1);
+
+    // {
+    //   SCOPED_REQUEST_WAIT_PROFILE;
+    //   std::optional<std::string> request_label = std::nullopt;
+    //   if (label_.has_value()) {
+    //     request_label = context.request_router->LabelToName(*label_);
+    //   }
+    //   current_batch_ = context.request_router->ScanVertices(request_label);
+    // }
+    // current_vertex_it_ = current_batch_.begin();
+    // return !current_batch_.empty(
+
+    if (get_prop_result.empty()) {
+      // return std::nullopt;
+      return false;
+    }
+
+    auto properties = get_prop_result[0].props;
+    // TODO (gvolfing) figure out labels when relevant.
+    msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
+
+    current_batch_ = {VertexAccessor(vertex, properties, &request_router)};
+    current_vertex_it_ = current_batch_.begin();
+    return current_batch_.empty();
+  }
+
+  //   std::vector<VertexAccessor> MakeRequest(Frame &frame, RequestRouterInterface &request_router,
+  //                                                      ExecutionContext &context) {
+  //   std::vector<VertexAccessor> ret;
+  //   // Evaluate the expressions that hold the PrimaryKey.
+  //   ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
+  //                                 storage::v3::View::NEW);
+
+  //   std::vector<msgs::Value> pk;
+  //   for (auto *primary_property : primary_key_) {
+  //     pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
+  //   }
+
+  //   msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
+
+  //   msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
+  //   auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
+  //     SCOPED_REQUEST_WAIT_PROFILE;
+  //     return request_router.GetProperties(req);
+  //   });
+  //   MG_ASSERT(get_prop_result.size() <= 1);
+
+  //   if (get_prop_result.empty()) {
+  //     return ret;
+  //   }
+  //   auto properties = get_prop_result[0].props;
+  //   // TODO (gvolfing) figure out labels when relevant.
+  //   msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
+  //   auto va = VertexAccessor(vertex, properties, &request_router);
+  //   ret.push_back(va);
+  //   return ret;
+  // }
+
   bool Pull(Frame &frame, ExecutionContext &context) override {
     SCOPED_PROFILE_OP(op_name_);
 
@@ -655,17 +730,95 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
     return false;
   }
 
+  bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
+    SCOPED_PROFILE_OP(op_name_);
+
+    if (!own_multi_frame_.has_value()) {
+      own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
+                                          kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
+      own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
+      own_frames_it_ = own_frames_consumer_->begin();
+    }
+
+    auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
+    auto populated_any = false;
+
+    while (true) {
+      switch (state_) {
+        case State::PullInput: {
+          if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
+            state_ = State::Exhausted;
+            return populated_any;
+          }
+          own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
+          own_frames_it_ = own_frames_consumer_->begin();
+          state_ = State::FetchVertices;
+          break;
+        }
+        case State::FetchVertices: {
+          if (own_frames_it_ == own_frames_consumer_->end()) {
+            state_ = State::PullInput;
+            continue;
+          }
+          if (!filter_expressions_->empty() || current_batch_.empty()) {
+            // MakeRequest(context);
+            MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context);
+          } else {
+            // We can reuse the vertices as they don't depend on any value from the frames
+            current_vertex_it_ = current_batch_.begin();
+          }
+          state_ = State::PopulateOutput;
+          break;
+        }
+        case State::PopulateOutput: {
+          if (!output_multi_frame.HasInvalidFrame()) {
+            return populated_any;
+          }
+          if (current_vertex_it_ == current_batch_.end()) {
+            own_frames_it_->MakeInvalid();
+            ++own_frames_it_;
+            state_ = State::FetchVertices;
+            continue;
+          }
+
+          for (auto output_frame_it = output_frames_populator.begin();
+               output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end();
+               ++output_frame_it) {
+            auto &output_frame = *output_frame_it;
+            output_frame = *own_frames_it_;
+            output_frame[output_symbol_] = TypedValue(*current_vertex_it_);
+            current_vertex_it_++;
+            populated_any = true;
+          }
+          break;
+        }
+        case State::Exhausted: {
+          return populated_any;
+        }
+      }
+    }
+    return populated_any;
+  };
+
   void Reset() override { input_cursor_->Reset(); }
 
   void Shutdown() override { input_cursor_->Shutdown(); }
 
  private:
+  enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted };
+
+  State state_{State::PullInput};
   const Symbol output_symbol_;
   const UniqueCursorPtr input_cursor_;
   const char *op_name_;
+  std::vector<VertexAccessor> current_batch_;
+  std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
   storage::v3::LabelId label_;
   std::optional<std::vector<Expression *>> filter_expressions_;
   std::vector<Expression *> primary_key_;
+  std::optional<MultiFrame> own_multi_frame_;
+  std::optional<ValidFramesConsumer> own_frames_consumer_;
+  ValidFramesConsumer::Iterator own_frames_it_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)

From 60b71cc2c107eb63882cd3c1cc9616cf204b32ea Mon Sep 17 00:00:00 2001
From: gvolfing <gabor.volfinger@memgraph.io>
Date: Tue, 31 Jan 2023 17:30:31 +0100
Subject: [PATCH 2/6] Rework ScanByPrimaryKey operator - multiframe

---
 src/query/v2/plan/operator.cpp | 158 ++++++++++++++-------------------
 1 file changed, 65 insertions(+), 93 deletions(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index ad1674d29..20425de6a 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -635,83 +635,43 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
     return VertexAccessor(vertex, properties, &request_router);
   }
 
-  // TODO (gvolfing) optinal vs empty vector for signaling failure?
-  bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) {
-    // Evaluate the expressions that hold the PrimaryKey.
-    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
-                                  storage::v3::View::NEW);
+  void MakeRequestMultiFrame(MultiFrame &multi_frame, RequestRouterInterface &request_router,
+                             ExecutionContext &context) {
+    msgs::GetPropertiesRequest req;
+    const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
 
-    std::vector<msgs::Value> pk;
-    for (auto *primary_property : primary_key_) {
-      pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
+    std::unordered_set<msgs::VertexId> used_vertex_ids;
+
+    for (auto &frame : multi_frame.GetValidFramesModifier()) {
+      ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
+                                    storage::v3::View::NEW);
+
+      std::vector<msgs::Value> pk;
+      for (auto *primary_property : primary_key_) {
+        pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
+      }
+
+      auto vertex_id = std::make_pair(label, std::move(pk));
+      auto [it, inserted] = used_vertex_ids.emplace(std::move(vertex_id));
+      if (inserted) {
+        req.vertex_ids.emplace_back(*it);
+      }
     }
 
-    msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
-
-    msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
     auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
       SCOPED_REQUEST_WAIT_PROFILE;
       return request_router.GetProperties(req);
     });
-    MG_ASSERT(get_prop_result.size() <= 1);
 
-    // {
-    //   SCOPED_REQUEST_WAIT_PROFILE;
-    //   std::optional<std::string> request_label = std::nullopt;
-    //   if (label_.has_value()) {
-    //     request_label = context.request_router->LabelToName(*label_);
-    //   }
-    //   current_batch_ = context.request_router->ScanVertices(request_label);
-    // }
-    // current_vertex_it_ = current_batch_.begin();
-    // return !current_batch_.empty(
+    for (auto &result : get_prop_result) {
+      auto properties = result.props;
+      // TODO (gvolfing) figure out labels when relevant.
+      msgs::Vertex vertex = {.id = result.vertex, .labels = {}};
 
-    if (get_prop_result.empty()) {
-      // return std::nullopt;
-      return false;
+      id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(vertex, properties, &request_router));
     }
-
-    auto properties = get_prop_result[0].props;
-    // TODO (gvolfing) figure out labels when relevant.
-    msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
-
-    current_batch_ = {VertexAccessor(vertex, properties, &request_router)};
-    current_vertex_it_ = current_batch_.begin();
-    return current_batch_.empty();
   }
 
-  //   std::vector<VertexAccessor> MakeRequest(Frame &frame, RequestRouterInterface &request_router,
-  //                                                      ExecutionContext &context) {
-  //   std::vector<VertexAccessor> ret;
-  //   // Evaluate the expressions that hold the PrimaryKey.
-  //   ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
-  //                                 storage::v3::View::NEW);
-
-  //   std::vector<msgs::Value> pk;
-  //   for (auto *primary_property : primary_key_) {
-  //     pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
-  //   }
-
-  //   msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
-
-  //   msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
-  //   auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
-  //     SCOPED_REQUEST_WAIT_PROFILE;
-  //     return request_router.GetProperties(req);
-  //   });
-  //   MG_ASSERT(get_prop_result.size() <= 1);
-
-  //   if (get_prop_result.empty()) {
-  //     return ret;
-  //   }
-  //   auto properties = get_prop_result[0].props;
-  //   // TODO (gvolfing) figure out labels when relevant.
-  //   msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
-  //   auto va = VertexAccessor(vertex, properties, &request_router);
-  //   ret.push_back(va);
-  //   return ret;
-  // }
-
   bool Pull(Frame &frame, ExecutionContext &context) override {
     SCOPED_PROFILE_OP(op_name_);
 
@@ -730,15 +690,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
     return false;
   }
 
-  bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
-    SCOPED_PROFILE_OP(op_name_);
-
+  void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
     if (!own_multi_frame_.has_value()) {
       own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
                                           kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
       own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
       own_frames_it_ = own_frames_consumer_->begin();
     }
+    MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size());
+  }
+
+  bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
+    SCOPED_PROFILE_OP(op_name_);
+    EnsureOwnMultiFrameIsGood(output_multi_frame);
 
     auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
     auto populated_any = false;
@@ -746,49 +710,58 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
     while (true) {
       switch (state_) {
         case State::PullInput: {
+          id_to_accessor_mapping_.clear();
           if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
             state_ = State::Exhausted;
             return populated_any;
           }
           own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
           own_frames_it_ = own_frames_consumer_->begin();
-          state_ = State::FetchVertices;
-          break;
-        }
-        case State::FetchVertices: {
+
           if (own_frames_it_ == own_frames_consumer_->end()) {
-            state_ = State::PullInput;
             continue;
           }
-          if (!filter_expressions_->empty() || current_batch_.empty()) {
-            // MakeRequest(context);
-            MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context);
-          } else {
-            // We can reuse the vertices as they don't depend on any value from the frames
-            current_vertex_it_ = current_batch_.begin();
-          }
+
+          MakeRequestMultiFrame(*own_multi_frame_, *context.request_router, context);
+
           state_ = State::PopulateOutput;
           break;
         }
         case State::PopulateOutput: {
           if (!output_multi_frame.HasInvalidFrame()) {
+            if (own_frames_it_ == own_frames_consumer_->end()) {
+              id_to_accessor_mapping_.clear();
+            }
             return populated_any;
           }
-          if (current_vertex_it_ == current_batch_.end()) {
-            own_frames_it_->MakeInvalid();
-            ++own_frames_it_;
-            state_ = State::FetchVertices;
+
+          if (own_frames_it_ == own_frames_consumer_->end()) {
+            state_ = State::PullInput;
             continue;
           }
 
           for (auto output_frame_it = output_frames_populator.begin();
-               output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end();
-               ++output_frame_it) {
+               output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end();
+               ++own_frames_it_) {
             auto &output_frame = *output_frame_it;
-            output_frame = *own_frames_it_;
-            output_frame[output_symbol_] = TypedValue(*current_vertex_it_);
-            current_vertex_it_++;
-            populated_any = true;
+
+            ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context,
+                                          context.request_router, storage::v3::View::NEW);
+
+            std::vector<msgs::Value> pk;
+            for (auto *primary_property : primary_key_) {
+              pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
+            }
+
+            const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
+            auto vertex_id = std::make_pair(label, std::move(pk));
+
+            if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) {
+              output_frame = *own_frames_it_;
+              output_frame[output_symbol_] = TypedValue(it->second);
+              populated_any = true;
+              ++output_frame_it;
+            }
           }
           break;
         }
@@ -805,20 +778,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
   void Shutdown() override { input_cursor_->Shutdown(); }
 
  private:
-  enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted };
+  enum class State { PullInput, PopulateOutput, Exhausted };
 
   State state_{State::PullInput};
   const Symbol output_symbol_;
   const UniqueCursorPtr input_cursor_;
   const char *op_name_;
-  std::vector<VertexAccessor> current_batch_;
-  std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
   storage::v3::LabelId label_;
   std::optional<std::vector<Expression *>> filter_expressions_;
   std::vector<Expression *> primary_key_;
   std::optional<MultiFrame> own_multi_frame_;
   std::optional<ValidFramesConsumer> own_frames_consumer_;
   ValidFramesConsumer::Iterator own_frames_it_;
+  std::unordered_map<msgs::VertexId, VertexAccessor> id_to_accessor_mapping_;
 };
 
 ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)

From 7d63236f871ec60d94c1dff4794ef028783456a6 Mon Sep 17 00:00:00 2001
From: gvolfing <gabor.volfinger@memgraph.io>
Date: Tue, 31 Jan 2023 17:36:52 +0100
Subject: [PATCH 3/6] Set the default pull-mechanism back to single-pull

---
 src/query/v2/interpreter.cpp | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp
index fe7a17bf7..aa220d764 100644
--- a/src/query/v2/interpreter.cpp
+++ b/src/query/v2/interpreter.cpp
@@ -812,8 +812,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
 std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
                                                                 const std::vector<Symbol> &output_symbols,
                                                                 std::map<std::string, TypedValue> *summary) {
-  // auto should_pull_multiple = false;  // TODO on the long term, we will only use PullMultiple
-  auto should_pull_multiple = true;
+  auto should_pull_multiple = false;  // TODO on the long term, we will only use PullMultiple
   if (should_pull_multiple) {
     return PullMultiple(stream, n, output_symbols, summary);
   }

From 4be4a86d0a932a2242c15967844d9ce3eb1c1676 Mon Sep 17 00:00:00 2001
From: gvolfing <107616712+gvolfing@users.noreply.github.com>
Date: Wed, 1 Feb 2023 11:39:48 +0100
Subject: [PATCH 4/6] Apply suggestions from code review
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Co-authored-by: János Benjamin Antal <antaljanosbenjamin@users.noreply.github.com>
---
 src/query/v2/plan/operator.cpp | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 20425de6a..4a512fcd6 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -664,11 +664,10 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
     });
 
     for (auto &result : get_prop_result) {
-      auto properties = result.props;
       // TODO (gvolfing) figure out labels when relevant.
       msgs::Vertex vertex = {.id = result.vertex, .labels = {}};
 
-      id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(vertex, properties, &request_router));
+      id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(std::move(vertex), std::move(result.properties), &request_router));
     }
   }
 

From bf93b53e7dcec4bbad54fe17968ce2d16b499a66 Mon Sep 17 00:00:00 2001
From: gvolfing <gabor.volfinger@memgraph.io>
Date: Wed, 1 Feb 2023 12:36:27 +0100
Subject: [PATCH 5/6] Fix compile error due to wrong aggregate field name

---
 src/query/v2/plan/operator.cpp | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 4a512fcd6..86b804db2 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -667,7 +667,8 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
       // TODO (gvolfing) figure out labels when relevant.
       msgs::Vertex vertex = {.id = result.vertex, .labels = {}};
 
-      id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(std::move(vertex), std::move(result.properties), &request_router));
+      id_to_accessor_mapping_.emplace(result.vertex,
+                                      VertexAccessor(std::move(vertex), std::move(result.props), &request_router));
     }
   }
 

From 8e315875f2f5aa08c60cc27dee29df5195399220 Mon Sep 17 00:00:00 2001
From: gvolfing <107616712+gvolfing@users.noreply.github.com>
Date: Thu, 2 Feb 2023 07:44:47 +0100
Subject: [PATCH 6/6] Update src/query/v2/plan/operator.cpp
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Co-authored-by: János Benjamin Antal <antaljanosbenjamin@users.noreply.github.com>
---
 src/query/v2/plan/operator.cpp | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 86b804db2..1139c0fbd 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -761,7 +761,8 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
               output_frame[output_symbol_] = TypedValue(it->second);
               populated_any = true;
               ++output_frame_it;
-            }
+            }           
+            own_frames_it_->MakeInvalid();
           }
           break;
         }