From 59c7d81ae85216d8251183960f0b1bb57ebd1b17 Mon Sep 17 00:00:00 2001
From: Kostas Kyrimis <kostaskyrim@gmail.com>
Date: Mon, 31 Oct 2022 11:52:20 +0200
Subject: [PATCH] [:pineapple: < T1086-MG] Test distributed operators e2e
 (#607)

* Fix Explain queries
* Add Vertex/Edge accessor support for properties
* Fix projections
* Fix expansions to fetch destination vertex properties
* Fix improper use of ShardMap on bolt and replaced it with the ShardRequestManager
* Add NameToId mappers on ShardRequestManager
* Add e2e tests for operators
* Fix OPTIONAL MATCH
---
 .github/workflows/diff.yaml                   |   9 +
 src/coordinator/shard_map.cpp                 |   5 +-
 src/coordinator/shard_map.hpp                 |   3 +-
 src/expr/ast.hpp                              |   4 +-
 src/expr/interpret/eval.hpp                   |   4 +-
 src/glue/v2/communication.cpp                 |  46 +++--
 src/glue/v2/communication.hpp                 |  36 ++--
 src/memgraph.cpp                              |  25 ++-
 src/query/v2/accessors.cpp                    |  68 ++++---
 src/query/v2/accessors.hpp                    |  15 +-
 src/query/v2/bindings/eval.hpp                |  15 +-
 src/query/v2/conversions.hpp                  |  12 +-
 .../interpret/awesome_memgraph_functions.cpp  |   2 +-
 .../interpret/awesome_memgraph_functions.hpp  |  11 +-
 src/query/v2/interpreter.cpp                  |  58 +++---
 src/query/v2/interpreter.hpp                  |   2 +
 src/query/v2/plan/operator.cpp                | 170 +++++++++++-------
 src/query/v2/requests.hpp                     |   1 -
 src/query/v2/shard_request_manager.hpp        |  37 +++-
 src/storage/v3/name_id_mapper.hpp             |   4 +
 src/storage/v3/shard_rsm.cpp                  |  52 ++++--
 src/utils/print_helpers.hpp                   |  16 +-
 tests/e2e/distributed_queries/CMakeLists.txt  |   5 +
 tests/e2e/distributed_queries/common.py       |  44 +++++
 tests/e2e/distributed_queries/distinct.py     |  38 ++++
 .../distributed_queries.py                    |  31 +---
 .../e2e/distributed_queries/optional_match.py |  37 ++++
 .../distributed_queries/order_by_and_limit.py |  44 +++++
 .../e2e/distributed_queries/unwind_collect.py |  34 ++++
 tests/e2e/distributed_queries/workloads.yaml  |  20 +++
 tests/setup.sh                                |  14 +-
 31 files changed, 602 insertions(+), 260 deletions(-)
 create mode 100644 tests/e2e/distributed_queries/common.py
 create mode 100644 tests/e2e/distributed_queries/distinct.py
 create mode 100644 tests/e2e/distributed_queries/optional_match.py
 create mode 100644 tests/e2e/distributed_queries/order_by_and_limit.py
 create mode 100644 tests/e2e/distributed_queries/unwind_collect.py

diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml
index 00aca7f5c..ef5cf2ee2 100644
--- a/.github/workflows/diff.yaml
+++ b/.github/workflows/diff.yaml
@@ -219,3 +219,12 @@ jobs:
           # Run simulation tests.
           cd build
           ctest -R memgraph__simulation --output-on-failure -j$THREADS
+
+      - name: Run e2e tests
+        run: |
+          # TODO(gitbuda): Setup mgclient and pymgclient properly.
+          cd tests
+          ./setup.sh
+          source ve3/bin/activate
+          cd e2e
+          LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-root-directory ./distributed_queries
diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp
index bd21dea3e..746f70368 100644
--- a/src/coordinator/shard_map.cpp
+++ b/src/coordinator/shard_map.cpp
@@ -9,6 +9,7 @@
 // by the Apache License, Version 2.0, included in the file
 // licenses/APL.txt.
 
+#include <optional>
 #include <unordered_map>
 #include <vector>
 
@@ -365,7 +366,6 @@ std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
   if (const auto it = labels.find(label); it != labels.end()) {
     return it->second;
   }
-
   return std::nullopt;
 }
 
@@ -382,7 +382,6 @@ std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_na
   if (const auto it = properties.find(property_name); it != properties.end()) {
     return it->second;
   }
-
   return std::nullopt;
 }
 
@@ -399,7 +398,6 @@ std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type)
   if (const auto it = edge_types.find(edge_type); it != edge_types.end()) {
     return it->second;
   }
-
   return std::nullopt;
 }
 
@@ -411,6 +409,7 @@ const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
   }
   throw utils::BasicException("EdgeTypeId not found!");
 }
+
 Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key,
                                    const PrimaryKey &end_key) const {
   MG_ASSERT(start_key <= end_key);
diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp
index 63274aa76..d8b3e0f6a 100644
--- a/src/coordinator/shard_map.hpp
+++ b/src/coordinator/shard_map.hpp
@@ -25,6 +25,7 @@
 #include "io/address.hpp"
 #include "storage/v3/config.hpp"
 #include "storage/v3/id_types.hpp"
+#include "storage/v3/name_id_mapper.hpp"
 #include "storage/v3/property_value.hpp"
 #include "storage/v3/schemas.hpp"
 #include "storage/v3/temporal.hpp"
@@ -120,9 +121,9 @@ struct ShardMap {
   std::map<PropertyName, PropertyId> properties;
   std::map<EdgeTypeName, EdgeTypeId> edge_types;
   uint64_t max_label_id{kNotExistingId};
-  std::map<LabelName, LabelId> labels;
   std::map<LabelId, LabelSpace> label_spaces;
   std::map<LabelId, std::vector<SchemaProperty>> schemas;
+  std::map<LabelName, LabelId> labels;
 
   [[nodiscard]] static ShardMap Parse(std::istream &input_stream);
   friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map);
diff --git a/src/expr/ast.hpp b/src/expr/ast.hpp
index 211d23ff4..070315a76 100644
--- a/src/expr/ast.hpp
+++ b/src/expr/ast.hpp
@@ -13,7 +13,7 @@
 #ifndef MG_AST_INCLUDE_PATH
 #ifdef MG_CLANG_TIDY_CHECK
 // NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
-#define MG_AST_INCLUDE_PATH "query/v2/frontend/ast/ast.hpp"
+#include "query/v2/bindings/bindings.hpp"
 #else
 #error Missing AST include path
 #endif
@@ -21,8 +21,6 @@
 
 #ifndef MG_INJECTED_NAMESPACE_NAME
 #ifdef MG_CLANG_TIDY_CHECK
-// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
-#define MG_INJECTED_NAMESPACE_NAME memgraph::query::v2
 #else
 #error Missing AST namespace
 #endif
diff --git a/src/expr/interpret/eval.hpp b/src/expr/interpret/eval.hpp
index 253c2e86d..1538028a0 100644
--- a/src/expr/interpret/eval.hpp
+++ b/src/expr/interpret/eval.hpp
@@ -718,7 +718,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
   TReturnType GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
     auto maybe_prop = record_accessor.GetProperty(prop.name);
     // Handler non existent property
-    return conv_(maybe_prop);
+    return conv_(maybe_prop, dba_);
   }
 
   template <class TRecordAccessor, class TTag = Tag,
@@ -726,7 +726,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
   TReturnType GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) {
     auto maybe_prop = record_accessor.GetProperty(std::string(name));
     // Handler non existent property
-    return conv_(maybe_prop);
+    return conv_(maybe_prop, dba_);
   }
 
   template <class TRecordAccessor, class TTag = Tag,
diff --git a/src/glue/v2/communication.cpp b/src/glue/v2/communication.cpp
index ffafc4a59..ebca8c23f 100644
--- a/src/glue/v2/communication.cpp
+++ b/src/glue/v2/communication.cpp
@@ -18,6 +18,7 @@
 #include "coordinator/shard_map.hpp"
 #include "query/v2/accessors.hpp"
 #include "query/v2/requests.hpp"
+#include "query/v2/shard_request_manager.hpp"
 #include "storage/v3/edge_accessor.hpp"
 #include "storage/v3/id_types.hpp"
 #include "storage/v3/result.hpp"
@@ -70,51 +71,52 @@ query::v2::TypedValue ToTypedValue(const Value &value) {
   }
 }
 
-storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex,
-                                                              const coordinator::ShardMap &shard_map,
-                                                              storage::v3::View /*view*/) {
+storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
+    const query::v2::accessors::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
+    storage::v3::View /*view*/) {
   auto id = communication::bolt::Id::FromUint(0);
 
   auto labels = vertex.Labels();
   std::vector<std::string> new_labels;
   new_labels.reserve(labels.size());
   for (const auto &label : labels) {
-    new_labels.push_back(shard_map.GetLabelName(label.id));
+    new_labels.push_back(shard_request_manager->LabelToName(label.id));
   }
 
   auto properties = vertex.Properties();
   std::map<std::string, Value> new_properties;
   for (const auto &[prop, property_value] : properties) {
-    new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
+    new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
   }
   return communication::bolt::Vertex{id, new_labels, new_properties};
 }
 
-storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge,
-                                                          const coordinator::ShardMap &shard_map,
-                                                          storage::v3::View /*view*/) {
+storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
+    const query::v2::accessors::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
+    storage::v3::View /*view*/) {
   // TODO(jbajic) Fix bolt communication
   auto id = communication::bolt::Id::FromUint(0);
   auto from = communication::bolt::Id::FromUint(0);
   auto to = communication::bolt::Id::FromUint(0);
-  const auto &type = shard_map.GetEdgeTypeName(edge.EdgeType());
+  const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType());
 
   auto properties = edge.Properties();
   std::map<std::string, Value> new_properties;
   for (const auto &[prop, property_value] : properties) {
-    new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value);
+    new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value);
   }
   return communication::bolt::Edge{id, from, to, type, new_properties};
 }
 
-storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*edge*/,
-                                                          const coordinator::ShardMap & /*shard_map*/,
-                                                          storage::v3::View /*view*/) {
+storage::v3::Result<communication::bolt::Path> ToBoltPath(
+    const query::v2::accessors::Path & /*edge*/, const msgs::ShardRequestManagerInterface * /*shard_request_manager*/,
+    storage::v3::View /*view*/) {
   // TODO(jbajic) Fix bolt communication
   return {storage::v3::Error::DELETED_OBJECT};
 }
 
-storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const coordinator::ShardMap &shard_map,
+storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value,
+                                       const msgs::ShardRequestManagerInterface *shard_request_manager,
                                        storage::v3::View view) {
   switch (value.type()) {
     case query::v2::TypedValue::Type::Null:
@@ -131,7 +133,7 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
       std::vector<Value> values;
       values.reserve(value.ValueList().size());
       for (const auto &v : value.ValueList()) {
-        auto maybe_value = ToBoltValue(v, shard_map, view);
+        auto maybe_value = ToBoltValue(v, shard_request_manager, view);
         if (maybe_value.HasError()) return maybe_value.GetError();
         values.emplace_back(std::move(*maybe_value));
       }
@@ -140,24 +142,24 @@ storage::v3::Result<Value> ToBoltValue(const query::v2::TypedValue &value, const
     case query::v2::TypedValue::Type::Map: {
       std::map<std::string, Value> map;
       for (const auto &kv : value.ValueMap()) {
-        auto maybe_value = ToBoltValue(kv.second, shard_map, view);
+        auto maybe_value = ToBoltValue(kv.second, shard_request_manager, view);
         if (maybe_value.HasError()) return maybe_value.GetError();
         map.emplace(kv.first, std::move(*maybe_value));
       }
       return Value(std::move(map));
     }
     case query::v2::TypedValue::Type::Vertex: {
-      auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_map, view);
+      auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view);
       if (maybe_vertex.HasError()) return maybe_vertex.GetError();
       return Value(std::move(*maybe_vertex));
     }
     case query::v2::TypedValue::Type::Edge: {
-      auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_map, view);
+      auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view);
       if (maybe_edge.HasError()) return maybe_edge.GetError();
       return Value(std::move(*maybe_edge));
     }
     case query::v2::TypedValue::Type::Path: {
-      auto maybe_path = ToBoltPath(value.ValuePath(), shard_map, view);
+      auto maybe_path = ToBoltPath(value.ValuePath(), shard_request_manager, view);
       if (maybe_path.HasError()) return maybe_path.GetError();
       return Value(std::move(*maybe_path));
     }
@@ -209,12 +211,6 @@ Value ToBoltValue(msgs::Value value) {
   }
 }
 
-storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path & /*path*/,
-                                                          const storage::v3::Shard & /*db*/,
-                                                          storage::v3::View /*view*/) {
-  return communication::bolt::Path();
-}
-
 storage::v3::PropertyValue ToPropertyValue(const Value &value) {
   switch (value.type()) {
     case Value::Type::Null:
diff --git a/src/glue/v2/communication.hpp b/src/glue/v2/communication.hpp
index 67a951c6f..ea9c6b4c9 100644
--- a/src/glue/v2/communication.hpp
+++ b/src/glue/v2/communication.hpp
@@ -15,6 +15,7 @@
 #include "communication/bolt/v1/value.hpp"
 #include "coordinator/shard_map.hpp"
 #include "query/v2/bindings/typed_value.hpp"
+#include "query/v2/shard_request_manager.hpp"
 #include "storage/v3/property_value.hpp"
 #include "storage/v3/result.hpp"
 #include "storage/v3/shard.hpp"
@@ -30,40 +31,40 @@ namespace memgraph::glue::v2 {
 
 /// @param storage::v3::VertexAccessor for converting to
 ///        communication::bolt::Vertex.
-/// @param coordinator::ShardMap shard_map getting label and property names.
+/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting label and property names.
 /// @param storage::v3::View for deciding which vertex attributes are visible.
 ///
 /// @throw std::bad_alloc
-storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(const storage::v3::VertexAccessor &vertex,
-                                                              const coordinator::ShardMap &shard_map,
-                                                              storage::v3::View view);
+storage::v3::Result<communication::bolt::Vertex> ToBoltVertex(
+    const storage::v3::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager,
+    storage::v3::View view);
 
 /// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
-/// @param coordinator::ShardMap shard_map getting edge type and property names.
+/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting edge type and property names.
 /// @param storage::v3::View for deciding which edge attributes are visible.
 ///
 /// @throw std::bad_alloc
-storage::v3::Result<communication::bolt::Edge> ToBoltEdge(const storage::v3::EdgeAccessor &edge,
-                                                          const coordinator::ShardMap &shard_map,
-                                                          storage::v3::View view);
+storage::v3::Result<communication::bolt::Edge> ToBoltEdge(
+    const storage::v3::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager,
+    storage::v3::View view);
 
 /// @param query::v2::Path for converting to communication::bolt::Path.
-/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
+/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
 /// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
 ///
 /// @throw std::bad_alloc
-storage::v3::Result<communication::bolt::Path> ToBoltPath(const query::v2::accessors::Path &path,
-                                                          const coordinator::ShardMap &shard_map,
-                                                          storage::v3::View view);
+storage::v3::Result<communication::bolt::Path> ToBoltPath(
+    const query::v2::accessors::Path &path, const msgs::ShardRequestManagerInterface *shard_request_manager,
+    storage::v3::View view);
 
 /// @param query::v2::TypedValue for converting to communication::bolt::Value.
-/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge.
+/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
 /// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
 ///
 /// @throw std::bad_alloc
-storage::v3::Result<communication::bolt::Value> ToBoltValue(const query::v2::TypedValue &value,
-                                                            const coordinator::ShardMap &shard_map,
-                                                            storage::v3::View view);
+storage::v3::Result<communication::bolt::Value> ToBoltValue(
+    const query::v2::TypedValue &value, const msgs::ShardRequestManagerInterface *shard_request_manager,
+    storage::v3::View view);
 
 query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);
 
@@ -73,7 +74,8 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val
 
 communication::bolt::Value ToBoltValue(msgs::Value value);
 
-communication::bolt::Value ToBoltValue(msgs::Value value, const coordinator::ShardMap &shard_map,
+communication::bolt::Value ToBoltValue(msgs::Value value,
+                                       const msgs::ShardRequestManagerInterface *shard_request_manager,
                                        storage::v3::View view);
 
 }  // namespace memgraph::glue::v2
diff --git a/src/memgraph.cpp b/src/memgraph.cpp
index c696615b5..301b3ed36 100644
--- a/src/memgraph.cpp
+++ b/src/memgraph.cpp
@@ -407,9 +407,8 @@ DEFINE_string(organization_name, "", "Organization name.");
 struct SessionData {
   // Explicit constructor here to ensure that pointers to all objects are
   // supplied.
-  SessionData(memgraph::coordinator::ShardMap &shard_map, memgraph::query::v2::InterpreterContext *interpreter_context)
-      : shard_map(&shard_map), interpreter_context(interpreter_context) {}
-  memgraph::coordinator::ShardMap *shard_map;
+  explicit SessionData(memgraph::query::v2::InterpreterContext *interpreter_context)
+      : interpreter_context(interpreter_context) {}
   memgraph::query::v2::InterpreterContext *interpreter_context;
 };
 
@@ -424,7 +423,6 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
               memgraph::communication::v2::OutputStream *output_stream)
       : memgraph::communication::bolt::Session<memgraph::communication::v2::InputStream,
                                                memgraph::communication::v2::OutputStream>(input_stream, output_stream),
-        shard_map_(data.shard_map),
         interpreter_(data.interpreter_context),
         endpoint_(endpoint) {}
 
@@ -455,7 +453,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
 
   std::map<std::string, memgraph::communication::bolt::Value> Pull(TEncoder *encoder, std::optional<int> n,
                                                                    std::optional<int> qid) override {
-    TypedValueResultStream stream(encoder, *shard_map_);
+    TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager());
     return PullResults(stream, n, qid);
   }
 
@@ -482,7 +480,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
       const auto &summary = interpreter_.Pull(&stream, n, qid);
       std::map<std::string, memgraph::communication::bolt::Value> decoded_summary;
       for (const auto &kv : summary) {
-        auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, *shard_map_, memgraph::storage::v3::View::NEW);
+        auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(),
+                                                           memgraph::storage::v3::View::NEW);
         if (maybe_value.HasError()) {
           switch (maybe_value.GetError()) {
             case memgraph::storage::v3::Error::DELETED_OBJECT:
@@ -507,14 +506,14 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
   /// before forwarding the calls to original TEncoder.
   class TypedValueResultStream {
    public:
-    TypedValueResultStream(TEncoder *encoder, const memgraph::coordinator::ShardMap &shard_map)
-        : encoder_(encoder), shard_map_(&shard_map) {}
+    TypedValueResultStream(TEncoder *encoder, const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager)
+        : encoder_(encoder), shard_request_manager_(shard_request_manager) {}
 
     void Result(const std::vector<memgraph::query::v2::TypedValue> &values) {
       std::vector<memgraph::communication::bolt::Value> decoded_values;
       decoded_values.reserve(values.size());
       for (const auto &v : values) {
-        auto maybe_value = memgraph::glue::v2::ToBoltValue(v, *shard_map_, memgraph::storage::v3::View::NEW);
+        auto maybe_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW);
         if (maybe_value.HasError()) {
           switch (maybe_value.GetError()) {
             case memgraph::storage::v3::Error::DELETED_OBJECT:
@@ -534,12 +533,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
 
    private:
     TEncoder *encoder_;
-    // NOTE: Needed only for ToBoltValue conversions
-    const memgraph::coordinator::ShardMap *shard_map_;
+    const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager_{nullptr};
   };
-
-  // NOTE: Needed only for ToBoltValue conversions
-  const memgraph::coordinator::ShardMap *shard_map_;
   memgraph::query::v2::Interpreter interpreter_;
   memgraph::communication::v2::ServerEndpoint endpoint_;
 };
@@ -680,7 +675,7 @@ int main(int argc, char **argv) {
       std::move(io),
       mm.CoordinatorAddress()};
 
-  SessionData session_data{sm, &interpreter_context};
+  SessionData session_data{&interpreter_context};
 
   interpreter_context.auth = nullptr;
   interpreter_context.auth_checker = nullptr;
diff --git a/src/query/v2/accessors.cpp b/src/query/v2/accessors.cpp
index 5391f1384..cbdce89e0 100644
--- a/src/query/v2/accessors.cpp
+++ b/src/query/v2/accessors.cpp
@@ -11,38 +11,59 @@
 
 #include "query/v2/accessors.hpp"
 #include "query/v2/requests.hpp"
+#include "query/v2/shard_request_manager.hpp"
 #include "storage/v3/id_types.hpp"
 
 namespace memgraph::query::v2::accessors {
-EdgeAccessor::EdgeAccessor(Edge edge) : edge(std::move(edge)) {}
+EdgeAccessor::EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager)
+    : edge(std::move(edge)), manager_(manager) {}
 
 EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
 
-const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const {
-  return edge.properties;
-  //    std::map<std::string, TypedValue> res;
-  //    for (const auto &[name, value] : *properties) {
-  //      res[name] = ValueToTypedValue(value);
-  //    }
-  //    return res;
-}
+const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const { return edge.properties; }
 
-// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
-Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const {
-  // TODO(kostasrim) fix this
-  return {};
+Value EdgeAccessor::GetProperty(const std::string &prop_name) const {
+  auto prop_id = manager_->NameToProperty(prop_name);
+  auto it = std::find_if(edge.properties.begin(), edge.properties.end(), [&](auto &pr) { return prop_id == pr.first; });
+  if (it == edge.properties.end()) {
+    return {};
+  }
+  return it->second;
 }
 
 const Edge &EdgeAccessor::GetEdge() const { return edge; }
 
 bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; };
 
-VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); }
+VertexAccessor EdgeAccessor::To() const {
+  return VertexAccessor(Vertex{edge.dst}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
+}
 
-VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); }
+VertexAccessor EdgeAccessor::From() const {
+  return VertexAccessor(Vertex{edge.src}, std::vector<std::pair<PropertyId, msgs::Value>>{}, manager_);
+}
 
-VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props)
-    : vertex(std::move(v)), properties(std::move(props)) {}
+VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
+                               const msgs::ShardRequestManagerInterface *manager)
+    : vertex(std::move(v)), properties(std::move(props)), manager_(manager) {}
+
+VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props,
+                               const msgs::ShardRequestManagerInterface *manager)
+    : vertex(std::move(v)), manager_(manager) {
+  properties.reserve(props.size());
+  for (auto &[id, value] : props) {
+    properties.emplace_back(std::make_pair(id, std::move(value)));
+  }
+}
+
+VertexAccessor::VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props,
+                               const msgs::ShardRequestManagerInterface *manager)
+    : vertex(std::move(v)), manager_(manager) {
+  properties.reserve(props.size());
+  for (const auto &[id, value] : props) {
+    properties.emplace_back(std::make_pair(id, value));
+  }
+}
 
 Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; }
 
@@ -58,15 +79,16 @@ bool VertexAccessor::HasLabel(Label &label) const {
 const std::vector<std::pair<PropertyId, Value>> &VertexAccessor::Properties() const { return properties; }
 
 Value VertexAccessor::GetProperty(PropertyId prop_id) const {
-  return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second;
-  //    return ValueToTypedValue(properties[prop_name]);
+  auto it = std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; });
+  if (it == properties.end()) {
+    return {};
+  }
+  return it->second;
 }
 
 // NOLINTNEXTLINE(readability-convert-member-functions-to-static)
-Value VertexAccessor::GetProperty(const std::string & /*prop_name*/) const {
-  // TODO(kostasrim) Add string mapping
-  return {};
-  //    return ValueToTypedValue(properties[prop_name]);
+Value VertexAccessor::GetProperty(const std::string &prop_name) const {
+  return GetProperty(manager_->NameToProperty(prop_name));
 }
 
 msgs::Vertex VertexAccessor::GetVertex() const { return vertex; }
diff --git a/src/query/v2/accessors.hpp b/src/query/v2/accessors.hpp
index eed169b8a..8e10c865d 100644
--- a/src/query/v2/accessors.hpp
+++ b/src/query/v2/accessors.hpp
@@ -11,6 +11,7 @@
 
 #pragma once
 
+#include <map>
 #include <optional>
 #include <utility>
 #include <vector>
@@ -23,6 +24,10 @@
 #include "utils/memory.hpp"
 #include "utils/memory_tracker.hpp"
 
+namespace memgraph::msgs {
+class ShardRequestManagerInterface;
+}  // namespace memgraph::msgs
+
 namespace memgraph::query::v2::accessors {
 
 using Value = memgraph::msgs::Value;
@@ -36,7 +41,7 @@ class VertexAccessor;
 
 class EdgeAccessor final {
  public:
-  explicit EdgeAccessor(Edge edge);
+  explicit EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager);
 
   [[nodiscard]] EdgeTypeId EdgeType() const;
 
@@ -64,6 +69,7 @@ class EdgeAccessor final {
 
  private:
   Edge edge;
+  const msgs::ShardRequestManagerInterface *manager_;
 };
 
 class VertexAccessor final {
@@ -71,7 +77,11 @@ class VertexAccessor final {
   using PropertyId = msgs::PropertyId;
   using Label = msgs::Label;
   using VertexId = msgs::VertexId;
-  VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props);
+  VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
+                 const msgs::ShardRequestManagerInterface *manager);
+
+  VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const msgs::ShardRequestManagerInterface *manager);
+  VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const msgs::ShardRequestManagerInterface *manager);
 
   [[nodiscard]] Label PrimaryLabel() const;
 
@@ -140,6 +150,7 @@ class VertexAccessor final {
  private:
   Vertex vertex;
   std::vector<std::pair<PropertyId, Value>> properties;
+  const msgs::ShardRequestManagerInterface *manager_;
 };
 
 // inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }
diff --git a/src/query/v2/bindings/eval.hpp b/src/query/v2/bindings/eval.hpp
index 8c4ad6d34..52455bdb2 100644
--- a/src/query/v2/bindings/eval.hpp
+++ b/src/query/v2/bindings/eval.hpp
@@ -23,6 +23,10 @@
 #include "storage/v3/property_value.hpp"
 #include "storage/v3/view.hpp"
 
+namespace memgraph::msgs {
+class ShardRequestManagerInterface;
+} // namespace memgraph::msgs
+
 namespace memgraph::query::v2 {
 
 inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
@@ -32,13 +36,14 @@ class Callable {
   auto operator()(const memgraph::storage::v3::PropertyValue &val) const {
     return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val);
   };
-  auto operator()(const msgs::Value &val) const { return ValueToTypedValue(val); };
+  auto operator()(const msgs::Value &val, memgraph::msgs::ShardRequestManagerInterface *manager) const {
+    return ValueToTypedValue(val, manager);
+  };
 };
 
 }  // namespace detail
-using ExpressionEvaluator =
-    memgraph::expr::ExpressionEvaluator<TypedValue, memgraph::query::v2::EvaluationContext, DbAccessor,
-                                        storage::v3::View, storage::v3::LabelId, msgs::Value, detail::Callable,
-                                        memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
+using ExpressionEvaluator = memgraph::expr::ExpressionEvaluator<
+    TypedValue, memgraph::query::v2::EvaluationContext, memgraph::msgs::ShardRequestManagerInterface, storage::v3::View,
+    storage::v3::LabelId, msgs::Value, detail::Callable, memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>;
 
 }  // namespace memgraph::query::v2
diff --git a/src/query/v2/conversions.hpp b/src/query/v2/conversions.hpp
index 0c67c794b..10299c919 100644
--- a/src/query/v2/conversions.hpp
+++ b/src/query/v2/conversions.hpp
@@ -13,10 +13,11 @@
 #include "bindings/typed_value.hpp"
 #include "query/v2/accessors.hpp"
 #include "query/v2/requests.hpp"
+#include "query/v2/shard_request_manager.hpp"
 
 namespace memgraph::query::v2 {
 
-inline TypedValue ValueToTypedValue(const msgs::Value &value) {
+inline TypedValue ValueToTypedValue(const msgs::Value &value, msgs::ShardRequestManagerInterface *manager) {
   using Value = msgs::Value;
   switch (value.type) {
     case Value::Type::Null:
@@ -34,7 +35,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
       std::vector<TypedValue> dst;
       dst.reserve(lst.size());
       for (const auto &elem : lst) {
-        dst.push_back(ValueToTypedValue(elem));
+        dst.push_back(ValueToTypedValue(elem, manager));
       }
       return TypedValue(std::move(dst));
     }
@@ -42,14 +43,15 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) {
       const auto &value_map = value.map_v;
       std::map<std::string, TypedValue> dst;
       for (const auto &[key, val] : value_map) {
-        dst[key] = ValueToTypedValue(val);
+        dst[key] = ValueToTypedValue(val, manager);
       }
       return TypedValue(std::move(dst));
     }
     case Value::Type::Vertex:
-      return TypedValue(accessors::VertexAccessor(value.vertex_v, {}));
+      return TypedValue(accessors::VertexAccessor(
+          value.vertex_v, std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>{}, manager));
     case Value::Type::Edge:
-      return TypedValue(accessors::EdgeAccessor(value.edge_v));
+      return TypedValue(accessors::EdgeAccessor(value.edge_v, manager));
   }
   throw std::runtime_error("Incorrect type in conversion");
 }
diff --git a/src/query/v2/interpret/awesome_memgraph_functions.cpp b/src/query/v2/interpret/awesome_memgraph_functions.cpp
index 46b0fb9fe..8768736b2 100644
--- a/src/query/v2/interpret/awesome_memgraph_functions.cpp
+++ b/src/query/v2/interpret/awesome_memgraph_functions.cpp
@@ -22,8 +22,8 @@
 
 #include "query/v2/bindings/typed_value.hpp"
 #include "query/v2/conversions.hpp"
-#include "query/v2/db_accessor.hpp"
 #include "query/v2/exceptions.hpp"
+#include "query/v2/shard_request_manager.hpp"
 #include "storage/v3/conversions.hpp"
 #include "utils/string.hpp"
 #include "utils/temporal.hpp"
diff --git a/src/query/v2/interpret/awesome_memgraph_functions.hpp b/src/query/v2/interpret/awesome_memgraph_functions.hpp
index d15ef5a9b..134f05d7d 100644
--- a/src/query/v2/interpret/awesome_memgraph_functions.hpp
+++ b/src/query/v2/interpret/awesome_memgraph_functions.hpp
@@ -16,12 +16,15 @@
 #include <unordered_map>
 
 #include "query/v2/bindings/typed_value.hpp"
+#include "query/v2/db_accessor.hpp"
 #include "storage/v3/view.hpp"
 #include "utils/memory.hpp"
 
-namespace memgraph::query::v2 {
+namespace memgraph::msgs {
+class ShardRequestManagerInterface;
+} // namespace memgraph::msgs
 
-class DbAccessor;
+namespace memgraph::query::v2 {
 
 namespace {
 const char kStartsWith[] = "STARTSWITH";
@@ -31,7 +34,9 @@ const char kId[] = "ID";
 }  // namespace
 
 struct FunctionContext {
-  DbAccessor *db_accessor;
+  // TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage.
+  // DbAccessor *db_accessor;
+  msgs::ShardRequestManagerInterface *manager;
   utils::MemoryResource *memory;
   int64_t timestamp;
   std::unordered_map<std::string, int64_t> *counters;
diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp
index c02497030..a1637b4c3 100644
--- a/src/query/v2/interpreter.cpp
+++ b/src/query/v2/interpreter.cpp
@@ -144,7 +144,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
 /// @throw QueryRuntimeException if an error ocurred.
 
 Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters &parameters,
-                         DbAccessor *db_accessor) {
+                         msgs::ShardRequestManagerInterface *manager) {
   // Empty frame for evaluation of password expression. This is OK since
   // password should be either null or string literal and it's evaluation
   // should not depend on frame.
@@ -155,7 +155,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
   // the argument to Callback.
   evaluation_context.timestamp = QueryTimestamp();
   evaluation_context.parameters = parameters;
-  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
+  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
 
   std::string username = auth_query->user_;
   std::string rolename = auth_query->role_;
@@ -313,7 +313,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
 }
 
 Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &parameters,
-                                InterpreterContext *interpreter_context, DbAccessor *db_accessor,
+                                InterpreterContext *interpreter_context, msgs::ShardRequestManagerInterface *manager,
                                 std::vector<Notification> *notifications) {
   expr::Frame<TypedValue> frame(0);
   SymbolTable symbol_table;
@@ -322,7 +322,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
   // the argument to Callback.
   evaluation_context.timestamp = QueryTimestamp();
   evaluation_context.parameters = parameters;
-  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
+  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
 
   Callback callback;
   switch (repl_query->action_) {
@@ -448,7 +448,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
   }
 }
 
-Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters, DbAccessor *db_accessor) {
+Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters,
+                            msgs::ShardRequestManagerInterface *manager) {
   expr::Frame<TypedValue> frame(0);
   SymbolTable symbol_table;
   EvaluationContext evaluation_context;
@@ -458,7 +459,7 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &param
       std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
           .count();
   evaluation_context.parameters = parameters;
-  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD);
+  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD);
 
   Callback callback;
   switch (setting_query->action_) {
@@ -886,7 +887,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
   EvaluationContext evaluation_context;
   evaluation_context.timestamp = QueryTimestamp();
   evaluation_context.parameters = parsed_query.parameters;
-  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
+  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
+                                storage::v3::View::OLD);
   const auto memory_limit =
       expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
   if (memory_limit) {
@@ -901,7 +903,6 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
         "convert the parsed row values to the appropriate type. This can be done using the built-in "
         "conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
   }
-  shard_request_manager->StartTransaction();
   auto plan = CypherQueryToPlan(
       parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters,
       parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, shard_request_manager);
@@ -957,10 +958,10 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
   auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
   MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN");
 
-  auto cypher_query_plan =
-      CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage),
-                        cypher_query, parsed_inner_query.parameters,
-                        parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, nullptr);
+  auto cypher_query_plan = CypherQueryToPlan(
+      parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query,
+      parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr,
+      shard_request_manager);
 
   std::stringstream printed_plan;
   plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan);
@@ -1030,7 +1031,8 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
   EvaluationContext evaluation_context;
   evaluation_context.timestamp = QueryTimestamp();
   evaluation_context.parameters = parsed_inner_query.parameters;
-  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD);
+  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager,
+                                storage::v3::View::OLD);
   const auto memory_limit =
       expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
 
@@ -1179,14 +1181,15 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
 
 PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
                                std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
-                               DbAccessor *dba, utils::MemoryResource *execution_memory) {
+                               DbAccessor *dba, utils::MemoryResource *execution_memory,
+                               msgs::ShardRequestManagerInterface *manager) {
   if (in_explicit_transaction) {
     throw UserModificationInMulticommandTxException();
   }
 
   auto *auth_query = utils::Downcast<AuthQuery>(parsed_query.query);
 
-  auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, dba);
+  auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, manager);
 
   SymbolTable symbol_table;
   std::vector<Symbol> output_symbols;
@@ -1215,14 +1218,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
 
 PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
                                       std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
-                                      DbAccessor *dba) {
+                                      msgs::ShardRequestManagerInterface *manager) {
   if (in_explicit_transaction) {
     throw ReplicationModificationInMulticommandTxException();
   }
 
   auto *replication_query = utils::Downcast<ReplicationQuery>(parsed_query.query);
   auto callback =
-      HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba, notifications);
+      HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, manager, notifications);
 
   return PreparedQuery{callback.header, std::move(parsed_query.required_privileges),
                        [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
@@ -1310,14 +1313,15 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
   throw SemanticException("CreateSnapshot query is not supported!");
 }
 
-PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, DbAccessor *dba) {
+PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
+                                  msgs::ShardRequestManagerInterface *manager) {
   if (in_explicit_transaction) {
     throw SettingConfigInMulticommandTxException{};
   }
 
   auto *setting_query = utils::Downcast<SettingQuery>(parsed_query.query);
   MG_ASSERT(setting_query);
-  auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, dba);
+  auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, manager);
 
   return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
                        [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr<PullPlanVector>{nullptr}](
@@ -1511,6 +1515,11 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
     ParsedQuery parsed_query =
         ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
     query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count();
+    if (!in_explicit_transaction_ &&
+        (utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) ||
+         utils::Downcast<ProfileQuery>(parsed_query.query))) {
+      shard_request_manager_->StartTransaction();
+    }
 
     utils::Timer planning_timer;
     PreparedQuery prepared_query;
@@ -1533,9 +1542,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
       prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
                                          &query_execution->notifications, interpreter_context_);
     } else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
-      prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
-                                        interpreter_context_, &*execution_db_accessor_,
-                                        &query_execution->execution_memory_with_exception);
+      prepared_query = PrepareAuthQuery(
+          std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_,
+          &*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get());
     } else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
       prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
                                         interpreter_context_, interpreter_context_->db,
@@ -1546,7 +1555,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
     } else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
       prepared_query =
           PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
-                                  interpreter_context_, &*execution_db_accessor_);
+                                  interpreter_context_, shard_request_manager_.get());
     } else if (utils::Downcast<LockPathQuery>(parsed_query.query)) {
       prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
                                             &*execution_db_accessor_);
@@ -1563,7 +1572,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
       prepared_query =
           PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
     } else if (utils::Downcast<SettingQuery>(parsed_query.query)) {
-      prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_);
+      prepared_query =
+          PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get());
     } else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
       prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
     } else if (utils::Downcast<SchemaQuery>(parsed_query.query)) {
diff --git a/src/query/v2/interpreter.hpp b/src/query/v2/interpreter.hpp
index 74484ded9..dc413ef44 100644
--- a/src/query/v2/interpreter.hpp
+++ b/src/query/v2/interpreter.hpp
@@ -296,6 +296,8 @@ class Interpreter final {
    */
   void Abort();
 
+  const msgs::ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); }
+
  private:
   struct QueryExecution {
     std::optional<PreparedQuery> prepared_query;
diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp
index 4243c6bd1..0832c60cf 100644
--- a/src/query/v2/plan/operator.cpp
+++ b/src/query/v2/plan/operator.cpp
@@ -169,6 +169,7 @@ class DistributedCreateNodeCursor : public Cursor {
     if (input_cursor_->Pull(frame, context)) {
       auto &shard_manager = context.shard_request_manager;
       shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
+      PlaceNodeOnTheFrame(frame, context);
       return true;
     }
 
@@ -179,8 +180,19 @@ class DistributedCreateNodeCursor : public Cursor {
 
   void Reset() override { state_ = {}; }
 
-  std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) const {
+  void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) {
+    // TODO(kostasrim) Make this work with batching
+    const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]};
+    msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])};
+    frame[nodes_info_.front()->symbol] = TypedValue(
+        query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager));
+  }
+
+  std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) {
     std::vector<msgs::NewVertex> requests;
+    // TODO(kostasrim) this assertion should be removed once we support multiple vertex creation
+    MG_ASSERT(nodes_info_.size() == 1);
+    msgs::PrimaryKey pk;
     for (const auto &node_info : nodes_info_) {
       msgs::NewVertex rqst;
       MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label");
@@ -188,17 +200,14 @@ class DistributedCreateNodeCursor : public Cursor {
       // TODO(jbajic) Fix properties not send,
       // suggestion: ignore distinction between properties and primary keys
       // since schema validation is done on storage side
-      std::map<msgs::PropertyId, msgs::Value> properties;
       ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr,
                                     storage::v3::View::NEW);
       if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
         for (const auto &[key, value_expression] : *node_info_properties) {
           TypedValue val = value_expression->Accept(evaluator);
-
           if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) {
             rqst.primary_key.push_back(TypedValueToValue(val));
-          } else {
-            properties[key] = TypedValueToValue(val);
+            pk.push_back(TypedValueToValue(val));
           }
         }
       } else {
@@ -207,9 +216,8 @@ class DistributedCreateNodeCursor : public Cursor {
           auto key_str = std::string(key);
           auto property_id = context.shard_request_manager->NameToProperty(key_str);
           if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) {
-            rqst.primary_key.push_back(storage::v3::TypedValueToValue(value));
-          } else {
-            properties[property_id] = TypedValueToValue(value);
+            rqst.primary_key.push_back(TypedValueToValue(value));
+            pk.push_back(TypedValueToValue(value));
           }
         }
       }
@@ -219,14 +227,18 @@ class DistributedCreateNodeCursor : public Cursor {
       }
       // TODO(kostasrim) Copy non primary labels as well
       rqst.label_ids.push_back(msgs::Label{.id = primary_label});
+      src_vertex_props_.push_back(rqst.properties);
       requests.push_back(std::move(rqst));
     }
+    primary_keys_.push_back(std::move(pk));
     return requests;
   }
 
  private:
   const UniqueCursorPtr input_cursor_;
   std::vector<const NodeCreationInfo *> nodes_info_;
+  std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_;
+  std::vector<msgs::PrimaryKey> primary_keys_;
   msgs::ExecutionState<msgs::CreateVerticesRequest> state_;
 };
 
@@ -687,7 +699,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) {
 
   // Like all filters, newly set values should not affect filtering of old
   // nodes and edges.
-  ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
+  ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager,
                                 storage::v3::View::OLD);
   while (input_cursor_->Pull(frame, context)) {
     if (EvaluateFilter(evaluator, self_.expression_)) return true;
@@ -728,8 +740,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
 
   if (input_cursor_->Pull(frame, context)) {
     // Produce should always yield the latest results.
-    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
-                                  storage::v3::View::NEW);
+    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
+                                  context.shard_request_manager, storage::v3::View::NEW);
     for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
 
     return true;
@@ -1149,8 +1161,8 @@ class AggregateCursor : public Cursor {
    * aggregation results, and not on the number of inputs.
    */
   void ProcessAll(Frame *frame, ExecutionContext *context) {
-    ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->db_accessor,
-                                  storage::v3::View::NEW);
+    ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context,
+                                  context->shard_request_manager, storage::v3::View::NEW);
     while (input_cursor_->Pull(*frame, *context)) {
       ProcessOne(*frame, &evaluator);
     }
@@ -1370,8 +1382,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) {
       // First successful pull from the input, evaluate the skip expression.
       // The skip expression doesn't contain identifiers so graph view
       // parameter is not important.
-      ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
-                                    storage::v3::View::OLD);
+      ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
+                                    context.shard_request_manager, storage::v3::View::OLD);
       TypedValue to_skip = self_.expression_->Accept(evaluator);
       if (to_skip.type() != TypedValue::Type::Int)
         throw QueryRuntimeException("Number of elements to skip must be an integer.");
@@ -1425,8 +1437,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) {
   if (limit_ == -1) {
     // Limit expression doesn't contain identifiers so graph view is not
     // important.
-    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
-                                  storage::v3::View::OLD);
+    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
+                                  context.shard_request_manager, storage::v3::View::OLD);
     TypedValue limit = self_.expression_->Accept(evaluator);
     if (limit.type() != TypedValue::Type::Int)
       throw QueryRuntimeException("Limit on number of returned elements must be an integer.");
@@ -1481,8 +1493,8 @@ class OrderByCursor : public Cursor {
     SCOPED_PROFILE_OP("OrderBy");
 
     if (!did_pull_all_) {
-      ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
-                                    storage::v3::View::OLD);
+      ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
+                                    context.shard_request_manager, storage::v3::View::OLD);
       auto *mem = cache_.get_allocator().GetMemoryResource();
       while (input_cursor_->Pull(frame, context)) {
         // collect the order_by elements
@@ -1739,8 +1751,8 @@ class UnwindCursor : public Cursor {
         if (!input_cursor_->Pull(frame, context)) return false;
 
         // successful pull from input, initialize value and iterator
-        ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
-                                      storage::v3::View::OLD);
+        ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
+                                      context.shard_request_manager, storage::v3::View::OLD);
         TypedValue input_value = self_.input_expression_->Accept(evaluator);
         if (input_value.type() != TypedValue::Type::List)
           throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
@@ -2217,7 +2229,7 @@ class LoadCsvCursor : public Cursor {
     //  self_->delimiter_, and self_->quote_ earlier (say, in the interpreter.cpp)
     //  without massacring the code even worse than I did here
     if (UNLIKELY(!reader_)) {
-      reader_ = MakeReader(&context.evaluation_context);
+      reader_ = MakeReader(context);
     }
 
     bool input_pulled = input_cursor_->Pull(frame, context);
@@ -2246,11 +2258,12 @@ class LoadCsvCursor : public Cursor {
   void Shutdown() override { input_cursor_->Shutdown(); }
 
  private:
-  csv::Reader MakeReader(EvaluationContext *eval_context) {
+  csv::Reader MakeReader(ExecutionContext &context) {
+    auto &eval_context = context.evaluation_context;
     Frame frame(0);
     SymbolTable symbol_table;
-    DbAccessor *dba = nullptr;
-    auto evaluator = ExpressionEvaluator(&frame, symbol_table, *eval_context, dba, storage::v3::View::OLD);
+    auto evaluator =
+        ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD);
 
     auto maybe_file = ToOptionalString(&evaluator, self_->file_);
     auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
@@ -2287,8 +2300,8 @@ class ForeachCursor : public Cursor {
       return false;
     }
 
-    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
-                                  storage::v3::View::NEW);
+    ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
+                                  context.shard_request_manager, storage::v3::View::NEW);
     TypedValue expr_result = expression->Accept(evaluator);
 
     if (expr_result.IsNull()) {
@@ -2458,15 +2471,51 @@ class DistributedExpandCursor : public Cursor {
       : self_(self),
         input_cursor_(self.input_->MakeCursor(mem)),
         current_in_edge_it_(current_in_edges_.begin()),
-        current_out_edge_it_(current_out_edges_.begin()) {
-    if (self_.common_.existing_node) {
-      throw QueryRuntimeException("Cannot use existing node with DistributedExpandOne cursor!");
-    }
-  }
+        current_out_edge_it_(current_out_edges_.begin()) {}
 
   using VertexAccessor = accessors::VertexAccessor;
   using EdgeAccessor = accessors::EdgeAccessor;
 
+  static constexpr auto DirectionToMsgsDirection(const auto direction) {
+    switch (direction) {
+      case EdgeAtom::Direction::IN:
+        return msgs::EdgeDirection::IN;
+      case EdgeAtom::Direction::OUT:
+        return msgs::EdgeDirection::OUT;
+      case EdgeAtom::Direction::BOTH:
+        return msgs::EdgeDirection::BOTH;
+    }
+  };
+
+  void PullDstVertex(Frame &frame, ExecutionContext &context, const EdgeAtom::Direction direction) {
+    if (self_.common_.existing_node) {
+      return;
+    }
+    MG_ASSERT(direction != EdgeAtom::Direction::BOTH);
+    const auto &edge = frame[self_.common_.edge_symbol].ValueEdge();
+    static auto get_dst_vertex = [&edge](const EdgeAtom::Direction direction) {
+      switch (direction) {
+        case EdgeAtom::Direction::IN:
+          return edge.From().Id();
+        case EdgeAtom::Direction::OUT:
+          return edge.To().Id();
+        case EdgeAtom::Direction::BOTH:
+          throw std::runtime_error("EdgeDirection Both not implemented");
+      }
+    };
+    msgs::ExpandOneRequest request;
+    // to not fetch any properties of the edges
+    request.edge_properties.emplace();
+    request.src_vertices.push_back(get_dst_vertex(direction));
+    request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
+    msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
+    auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
+    MG_ASSERT(result_rows.size() == 1);
+    auto &result_row = result_rows.front();
+    frame[self_.common_.node_symbol] = accessors::VertexAccessor(
+        msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager);
+  }
+
   bool InitEdges(Frame &frame, ExecutionContext &context) {
     // Input Vertex could be null if it is created by a failed optional match. In
     // those cases we skip that input pull and continue with the next.
@@ -2480,44 +2529,41 @@ class DistributedExpandCursor : public Cursor {
 
       ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
       auto &vertex = vertex_value.ValueVertex();
-      static constexpr auto direction_to_msgs_direction = [](const EdgeAtom::Direction direction) {
-        switch (direction) {
-          case EdgeAtom::Direction::IN:
-            return msgs::EdgeDirection::IN;
-          case EdgeAtom::Direction::OUT:
-            return msgs::EdgeDirection::OUT;
-          case EdgeAtom::Direction::BOTH:
-            return msgs::EdgeDirection::BOTH;
-        }
-      };
-
       msgs::ExpandOneRequest request;
-      request.direction = direction_to_msgs_direction(self_.common_.direction);
+      request.direction = DirectionToMsgsDirection(self_.common_.direction);
       // to not fetch any properties of the edges
       request.edge_properties.emplace();
       request.src_vertices.push_back(vertex.Id());
       msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
       auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
-      MG_ASSERT(result_rows.size() == 1);
       auto &result_row = result_rows.front();
 
-      const auto convert_edges = [&vertex](
+      if (self_.common_.existing_node) {
+        const auto &node = frame[self_.common_.node_symbol].ValueVertex().Id();
+        auto &in = result_row.in_edges_with_specific_properties;
+        std::erase_if(in, [&node](auto &edge) { return edge.other_end != node; });
+        auto &out = result_row.out_edges_with_specific_properties;
+        std::erase_if(out, [&node](auto &edge) { return edge.other_end != node; });
+      }
+
+      const auto convert_edges = [&vertex, &context](
                                      std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
                                      const EdgeAtom::Direction direction) {
         std::vector<EdgeAccessor> edge_accessors;
         edge_accessors.reserve(edge_messages.size());
+
         switch (direction) {
           case EdgeAtom::Direction::IN: {
             for (auto &edge : edge_messages) {
-              edge_accessors.emplace_back(
-                  msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type});
+              edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
+                                          context.shard_request_manager);
             }
             break;
           }
           case EdgeAtom::Direction::OUT: {
             for (auto &edge : edge_messages) {
-              edge_accessors.emplace_back(
-                  msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type});
+              edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
+                                          context.shard_request_manager);
             }
             break;
           }
@@ -2527,12 +2573,13 @@ class DistributedExpandCursor : public Cursor {
         }
         return edge_accessors;
       };
+
       current_in_edges_ =
           convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN);
       current_in_edge_it_ = current_in_edges_.begin();
-      current_in_edges_ =
-          convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::OUT);
-      current_in_edge_it_ = current_in_edges_.begin();
+      current_out_edges_ =
+          convert_edges(std::move(result_row.out_edges_with_specific_properties), EdgeAtom::Direction::OUT);
+      current_out_edge_it_ = current_out_edges_.begin();
       return true;
     }
   }
@@ -2540,19 +2587,6 @@ class DistributedExpandCursor : public Cursor {
   bool Pull(Frame &frame, ExecutionContext &context) override {
     SCOPED_PROFILE_OP("DistributedExpand");
     // A helper function for expanding a node from an edge.
-    auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) {
-      if (self_.common_.existing_node) return;
-      switch (direction) {
-        case EdgeAtom::Direction::IN:
-          frame[self_.common_.node_symbol] = new_edge.From();
-          break;
-        case EdgeAtom::Direction::OUT:
-          frame[self_.common_.node_symbol] = new_edge.To();
-          break;
-        case EdgeAtom::Direction::BOTH:
-          LOG_FATAL("Must indicate exact expansion direction here");
-      }
-    };
 
     while (true) {
       if (MustAbort(context)) throw HintedAbortError();
@@ -2561,7 +2595,7 @@ class DistributedExpandCursor : public Cursor {
         auto &edge = *current_in_edge_it_;
         ++current_in_edge_it_;
         frame[self_.common_.edge_symbol] = edge;
-        pull_node(edge, EdgeAtom::Direction::IN);
+        PullDstVertex(frame, context, EdgeAtom::Direction::IN);
         return true;
       }
 
@@ -2573,7 +2607,7 @@ class DistributedExpandCursor : public Cursor {
           continue;
         };
         frame[self_.common_.edge_symbol] = edge;
-        pull_node(edge, EdgeAtom::Direction::OUT);
+        PullDstVertex(frame, context, EdgeAtom::Direction::OUT);
         return true;
       }
 
diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp
index 49ae346cb..2ea4efb57 100644
--- a/src/query/v2/requests.hpp
+++ b/src/query/v2/requests.hpp
@@ -387,7 +387,6 @@ struct GetPropertiesResponse {
 enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
 
 struct ExpandOneRequest {
-  // TODO(antaljanosbenjamin): Filtering based on the id of the other end of the edge?
   Hlc transaction_id;
   std::vector<VertexId> src_vertices;
   // return types that type is in this list
diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp
index 79794aa1a..a73201046 100644
--- a/src/query/v2/shard_request_manager.hpp
+++ b/src/query/v2/shard_request_manager.hpp
@@ -171,6 +171,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
 
     if (hlc_response.fresher_shard_map) {
       shards_map_ = hlc_response.fresher_shard_map.value();
+      SetUpNameIdMappers();
     }
   }
 
@@ -186,6 +187,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
 
     if (hlc_response.fresher_shard_map) {
       shards_map_ = hlc_response.fresher_shard_map.value();
+      SetUpNameIdMappers();
     }
     auto commit_timestamp = hlc_response.new_hlc;
 
@@ -223,14 +225,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
     return shards_map_.GetLabelId(name).value();
   }
 
-  const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override {
-    return shards_map_.GetPropertyName(prop);
+  const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override {
+    return properties_.IdToName(id.AsUint());
   }
-  const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override {
-    return shards_map_.GetLabelName(label);
+  const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override {
+    return labels_.IdToName(id.AsUint());
   }
-  const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override {
-    return shards_map_.GetEdgeTypeName(type);
+  const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override {
+    return edge_types_.IdToName(id.AsUint());
   }
 
   bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
@@ -358,7 +360,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
     std::vector<VertexAccessor> accessors;
     for (auto &response : responses) {
       for (auto &result_row : response.results) {
-        accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props)));
+        accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this));
       }
     }
     return accessors;
@@ -697,7 +699,28 @@ class ShardRequestManager : public ShardRequestManagerInterface {
     }
   }
 
+  void SetUpNameIdMappers() {
+    std::unordered_map<uint64_t, std::string> id_to_name;
+    for (const auto &[name, id] : shards_map_.labels) {
+      id_to_name.emplace(id.AsUint(), name);
+    }
+    labels_.StoreMapping(std::move(id_to_name));
+    id_to_name.clear();
+    for (const auto &[name, id] : shards_map_.properties) {
+      id_to_name.emplace(id.AsUint(), name);
+    }
+    properties_.StoreMapping(std::move(id_to_name));
+    id_to_name.clear();
+    for (const auto &[name, id] : shards_map_.edge_types) {
+      id_to_name.emplace(id.AsUint(), name);
+    }
+    edge_types_.StoreMapping(std::move(id_to_name));
+  }
+
   ShardMap shards_map_;
+  storage::v3::NameIdMapper properties_;
+  storage::v3::NameIdMapper edge_types_;
+  storage::v3::NameIdMapper labels_;
   CoordinatorClient coord_cli_;
   RsmStorageClientManager<StorageClient> storage_cli_manager_;
   memgraph::io::Io<TTransport> io_;
diff --git a/src/storage/v3/name_id_mapper.hpp b/src/storage/v3/name_id_mapper.hpp
index 94ed62625..f241bca54 100644
--- a/src/storage/v3/name_id_mapper.hpp
+++ b/src/storage/v3/name_id_mapper.hpp
@@ -53,6 +53,10 @@ class NameIdMapper final {
     return it->second;
   }
 
+  const auto &GetIdToNameMap() const { return id_to_name_; }
+
+  const auto &GetNameToIdMap() const { return name_to_id_; }
+
  private:
   // Necessary for comparison with string_view nad string
   // https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0919r1.html
diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp
index a875e0164..9a124b250 100644
--- a/src/storage/v3/shard_rsm.cpp
+++ b/src/storage/v3/shard_rsm.cpp
@@ -17,6 +17,7 @@
 
 #include "parser/opencypher/parser.hpp"
 #include "query/v2/requests.hpp"
+#include "storage/v2/view.hpp"
 #include "storage/v3/bindings/ast/ast.hpp"
 #include "storage/v3/bindings/cypher_main_visitor.hpp"
 #include "storage/v3/bindings/db_accessor.hpp"
@@ -113,6 +114,24 @@ std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor
   return ret;
 }
 
+std::optional<std::map<PropertyId, Value>> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view,
+                                                                   const Schemas::Schema *schema) {
+  std::map<PropertyId, Value> ret;
+  auto props = acc.Properties(view);
+  auto maybe_pk = acc.PrimaryKey(view);
+  if (maybe_pk.HasError()) {
+    spdlog::debug("Encountered an error while trying to get vertex primary key.");
+    return std::nullopt;
+  }
+  auto &pk = maybe_pk.GetValue();
+  MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
+  for (size_t i{0}; i < schema->second.size(); ++i) {
+    ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
+  }
+
+  return ret;
+}
+
 std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view,
                                                                             const Schemas::Schema *schema) {
   std::map<PropertyId, Value> ret;
@@ -129,17 +148,9 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(cons
                  });
   properties.clear();
 
-  // TODO(antaljanosbenjamin): Once the VertexAccessor::Properties returns also the primary keys, we can get rid of this
-  // code.
-  auto maybe_pk = acc.PrimaryKey(view);
-  if (maybe_pk.HasError()) {
-    spdlog::debug("Encountered an error while trying to get vertex primary key.");
-  }
-
-  auto &pk = maybe_pk.GetValue();
-  MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!");
-  for (size_t i{0}; i < schema->second.size(); ++i) {
-    ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i])));
+  auto pks = PrimaryKeysFromAccessor(acc, view, schema);
+  if (pks) {
+    ret.merge(*pks);
   }
 
   return ret;
@@ -190,7 +201,9 @@ std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccesso
 }
 
 std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
-                                                                        const msgs::ExpandOneRequest &req) {
+                                                                        const msgs::ExpandOneRequest &req,
+                                                                        storage::v3::View view,
+                                                                        const Schemas::Schema *schema) {
   std::map<PropertyId, Value> src_vertex_properties;
 
   if (!req.src_vertex_properties) {
@@ -204,6 +217,10 @@ std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const st
     for (auto &[key, val] : props.GetValue()) {
       src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val))));
     }
+    auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema);
+    if (pks) {
+      src_vertex_properties.merge(*pks);
+    }
 
   } else if (req.src_vertex_properties.value().empty()) {
     // NOOP
@@ -264,7 +281,6 @@ std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges(
         return std::nullopt;
       }
       in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN);
-
       auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types);
       if (out_edges_result.HasError()) {
         spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}",
@@ -303,7 +319,8 @@ bool FillEdges(const std::vector<EdgeAccessor> &edges, msgs::ExpandOneResultRow
 
 std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
     Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
-    const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) {
+    const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
+    const Schemas::Schema *schema) {
   /// Fill up source vertex
   const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
   auto v_acc = acc.FindVertex(primary_key, View::NEW);
@@ -312,9 +329,9 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
   if (!source_vertex) {
     return std::nullopt;
   }
+  std::optional<std::map<PropertyId, Value>> src_vertex_properties;
+  src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
 
-  /// Fill up source vertex properties
-  auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req);
   if (!src_vertex_properties) {
     return std::nullopt;
   }
@@ -852,7 +869,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
         continue;
       }
     }
-    auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
+    auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler,
+                                     shard_->GetSchema(shard_->PrimaryLabel()));
 
     if (!result) {
       action_successful = false;
diff --git a/src/utils/print_helpers.hpp b/src/utils/print_helpers.hpp
index eeb18e061..05c07fdc2 100644
--- a/src/utils/print_helpers.hpp
+++ b/src/utils/print_helpers.hpp
@@ -33,8 +33,9 @@ std::ostream &operator<<(std::ostream &in, const std::vector<T> &vector) {
   return in;
 }
 
-template <typename K, typename V>
-std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
+namespace detail {
+template <typename T>
+std::ostream &MapImpl(std::ostream &in, const T &map) {
   in << "{";
   bool first = true;
   for (const auto &[a, b] : map) {
@@ -49,6 +50,17 @@ std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
   in << "}";
   return in;
 }
+}  // namespace detail
+
+template <typename K, typename V>
+std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
+  return detail::MapImpl(in, map);
+}
+
+template <typename K, typename V, typename THash, typename Cmp>
+std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V, THash, Cmp> &map) {
+  return detail::MapImpl(in, map);
+}
 
 template <typename K, typename V>
 std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V> &map) {
diff --git a/tests/e2e/distributed_queries/CMakeLists.txt b/tests/e2e/distributed_queries/CMakeLists.txt
index 16dc84ab6..9a2c48c63 100644
--- a/tests/e2e/distributed_queries/CMakeLists.txt
+++ b/tests/e2e/distributed_queries/CMakeLists.txt
@@ -3,3 +3,8 @@ function(distributed_queries_e2e_python_files FILE_NAME)
 endfunction()
 
 distributed_queries_e2e_python_files(distributed_queries.py)
+distributed_queries_e2e_python_files(unwind_collect.py)
+distributed_queries_e2e_python_files(order_by_and_limit.py)
+distributed_queries_e2e_python_files(distinct.py)
+distributed_queries_e2e_python_files(optional_match.py)
+distributed_queries_e2e_python_files(common.py)
diff --git a/tests/e2e/distributed_queries/common.py b/tests/e2e/distributed_queries/common.py
new file mode 100644
index 000000000..3588a803c
--- /dev/null
+++ b/tests/e2e/distributed_queries/common.py
@@ -0,0 +1,44 @@
+# Copyright 2022 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+import typing
+import mgclient
+import sys
+import pytest
+import time
+
+
+@pytest.fixture(autouse=True)
+def connection():
+    connection = connect()
+    return connection
+
+
+def connect(**kwargs) -> mgclient.Connection:
+    connection = mgclient.connect(host="localhost", port=7687, **kwargs)
+    connection.autocommit = True
+    return connection
+
+
+def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
+    cursor.execute(query, params)
+    return cursor.fetchall()
+
+
+def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
+    results = execute_and_fetch_all(cursor, query)
+    return len(results) == n
+
+
+def wait_for_shard_manager_to_initialize():
+    # The ShardManager in memgraph takes some time to initialize
+    # the shards, thus we cannot just run the queries right away
+    time.sleep(3)
diff --git a/tests/e2e/distributed_queries/distinct.py b/tests/e2e/distributed_queries/distinct.py
new file mode 100644
index 000000000..9ebe50e6f
--- /dev/null
+++ b/tests/e2e/distributed_queries/distinct.py
@@ -0,0 +1,38 @@
+# Copyright 2022 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+import typing
+import mgclient
+import sys
+import pytest
+import time
+from common import *
+
+
+def test_distinct(connection):
+    wait_for_shard_manager_to_initialize()
+    cursor = connection.cursor()
+
+    assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
+    assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
+    assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
+    assert has_n_result_row(cursor, "MATCH (n)-[r]->(m) RETURN r", 4)
+
+    results = execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN DISTINCT m")
+    assert len(results) == 2
+    for i, n in enumerate(results):
+        n_props = n[0].properties
+        assert len(n_props) == 1
+        assert n_props["property"] == i
+
+
+if __name__ == "__main__":
+    sys.exit(pytest.main([__file__, "-rA"]))
diff --git a/tests/e2e/distributed_queries/distributed_queries.py b/tests/e2e/distributed_queries/distributed_queries.py
index 68ebdef8d..b4b6324d9 100644
--- a/tests/e2e/distributed_queries/distributed_queries.py
+++ b/tests/e2e/distributed_queries/distributed_queries.py
@@ -14,34 +14,7 @@ import mgclient
 import sys
 import pytest
 import time
-
-
-@pytest.fixture(autouse=True)
-def connection():
-    connection = connect()
-    return connection
-
-
-def connect(**kwargs) -> mgclient.Connection:
-    connection = mgclient.connect(host="localhost", port=7687, **kwargs)
-    connection.autocommit = True
-    return connection
-
-
-def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
-    cursor.execute(query, params)
-    return cursor.fetchall()
-
-
-def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
-    results = execute_and_fetch_all(cursor, query)
-    return len(results) == n
-
-
-def wait_for_shard_manager_to_initialize():
-    # The ShardManager in memgraph takes some time to initialize
-    # the shards, thus we cannot just run the queries right away
-    time.sleep(3)
+from common import *
 
 
 def test_vertex_creation_and_scanall(connection):
@@ -62,7 +35,7 @@ def test_vertex_creation_and_scanall(connection):
     assert len(results) == 9
     for (n, r, m) in results:
         n_props = n.properties
-        assert len(n_props) == 0, "n is not expected to have properties, update the test!"
+        assert len(n_props) == 1, "n is not expected to have properties, update the test!"
         assert len(n.labels) == 0, "n is not expected to have labels, update the test!"
 
         assert r.type == "TO"
diff --git a/tests/e2e/distributed_queries/optional_match.py b/tests/e2e/distributed_queries/optional_match.py
new file mode 100644
index 000000000..731c36181
--- /dev/null
+++ b/tests/e2e/distributed_queries/optional_match.py
@@ -0,0 +1,37 @@
+# Copyright 2022 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+import typing
+import mgclient
+import sys
+import pytest
+import time
+from common import *
+
+
+def test_optional_match(connection):
+    wait_for_shard_manager_to_initialize()
+    cursor = connection.cursor()
+
+    assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0)
+
+    results = execute_and_fetch_all(
+        cursor, "MATCH (n:label) OPTIONAL MATCH (n:label)-[:TO]->(parent:label) RETURN parent"
+    )
+    assert len(results) == 1
+
+    assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
+    assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0)
+    assert has_n_result_row(cursor, "MATCH (n:label) OPTIONAL MATCH (n)-[r:TO]->(m:label) RETURN r", 4)
+
+
+if __name__ == "__main__":
+    sys.exit(pytest.main([__file__, "-rA"]))
diff --git a/tests/e2e/distributed_queries/order_by_and_limit.py b/tests/e2e/distributed_queries/order_by_and_limit.py
new file mode 100644
index 000000000..05297f8f6
--- /dev/null
+++ b/tests/e2e/distributed_queries/order_by_and_limit.py
@@ -0,0 +1,44 @@
+# Copyright 2022 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+import typing
+import mgclient
+import sys
+import pytest
+import time
+from common import *
+
+
+def test_order_by_and_limit(connection):
+    wait_for_shard_manager_to_initialize()
+    cursor = connection.cursor()
+
+    assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
+    assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
+    assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
+    assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
+
+    results = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property DESC")
+    assert len(results) == 4
+    i = 4
+    for n in results:
+        n_props = n[0].properties
+        assert len(n_props) == 1
+        assert n_props["property"] == i
+        i = i - 1
+
+    result = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property LIMIT 1")
+    assert len(result) == 1
+    assert result[0][0].properties["property"] == 1
+
+
+if __name__ == "__main__":
+    sys.exit(pytest.main([__file__, "-rA"]))
diff --git a/tests/e2e/distributed_queries/unwind_collect.py b/tests/e2e/distributed_queries/unwind_collect.py
new file mode 100644
index 000000000..b51ac9907
--- /dev/null
+++ b/tests/e2e/distributed_queries/unwind_collect.py
@@ -0,0 +1,34 @@
+# Copyright 2022 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+import typing
+import mgclient
+import sys
+import pytest
+import time
+from common import *
+
+
+def test_collect_unwind(connection):
+    wait_for_shard_manager_to_initialize()
+    cursor = connection.cursor()
+
+    assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0)
+    assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0)
+    assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0)
+    assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0)
+
+    assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS result RETURN result", 1)
+    assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS nd UNWIND nd AS result RETURN result", 4)
+
+
+if __name__ == "__main__":
+    sys.exit(pytest.main([__file__, "-rA"]))
diff --git a/tests/e2e/distributed_queries/workloads.yaml b/tests/e2e/distributed_queries/workloads.yaml
index 083fb91ca..c4ba13500 100644
--- a/tests/e2e/distributed_queries/workloads.yaml
+++ b/tests/e2e/distributed_queries/workloads.yaml
@@ -11,3 +11,23 @@ workloads:
     binary: "tests/e2e/pytest_runner.sh"
     args: ["distributed_queries/distributed_queries.py"]
     <<: *template_cluster
+
+  - name: "Distributed unwind collect"
+    binary: "tests/e2e/pytest_runner.sh"
+    args: ["distributed_queries/unwind_collect.py"]
+    <<: *template_cluster
+
+  - name: "Distributed order by and limit"
+    binary: "tests/e2e/pytest_runner.sh"
+    args: ["distributed_queries/order_by_and_limit.py"]
+    <<: *template_cluster
+
+  - name: "Distributed distinct"
+    binary: "tests/e2e/pytest_runner.sh"
+    args: ["distributed_queries/distinct.py"]
+    <<: *template_cluster
+
+  - name: "Distributed optional match"
+    binary: "tests/e2e/pytest_runner.sh"
+    args: ["distributed_queries/optional_match.py"]
+    <<: *template_cluster
diff --git a/tests/setup.sh b/tests/setup.sh
index a74ee6ff9..4b69bbf1c 100755
--- a/tests/setup.sh
+++ b/tests/setup.sh
@@ -12,7 +12,7 @@ PIP_DEPS=(
    "neo4j-driver==4.1.1"
    "parse==1.18.0"
    "parse-type==0.5.2"
-   "pytest==6.2.3"
+   "pytest==6.2.5"
    "pyyaml==5.4.1"
    "six==1.15.0"
 )
@@ -36,12 +36,12 @@ PYTHON_MINOR=$(python3 -c 'import sys; print(sys.version_info[:][1])')
 # NOTE (2021-11-15): PyPi doesn't contain pulsar-client for Python 3.9 so we have to use
 # our manually built wheel file. When they update the repository, pulsar-client can be
 # added as a regular PIP dependancy
-if [ $PYTHON_MINOR -lt 9 ]; then
-  pip --timeout 1000 install "pulsar-client==2.8.1"
-else
-  pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
-fi
-
+#if [ $PYTHON_MINOR -lt 9 ]; then
+#  pip --timeout 1000 install "pulsar-client==2.8.1"
+#else
+#  pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
+#fi
+#
 for pkg in "${PIP_DEPS[@]}"; do
     pip --timeout 1000 install "$pkg"
 done