From 45521bdba828e090d9947e20fb1af04cbbec4218 Mon Sep 17 00:00:00 2001
From: jbajic <jure.bajic@memgraph.com>
Date: Mon, 23 Jan 2023 18:04:48 +0100
Subject: [PATCH] Add from SplitData

---
 src/storage/v3/shard.cpp              | 34 +++++++++++++------
 src/storage/v3/shard.hpp              | 12 ++++---
 src/storage/v3/splitter.cpp           | 30 +++++++++++------
 src/storage/v3/splitter.hpp           | 27 ++++++++++-----
 tests/unit/storage_v3_shard_split.cpp | 48 ++++++++++++++++++---------
 5 files changed, 100 insertions(+), 51 deletions(-)

diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp
index d26afc76b..566029b9c 100644
--- a/src/storage/v3/shard.cpp
+++ b/src/storage/v3/shard.cpp
@@ -333,15 +333,16 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
       indices_{config.items, vertex_validator_},
       isolation_level_{config.transaction.isolation_level},
       config_{config},
-      shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
+      shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema,
+                      name_id_mapper_) {
   CreateSchema(primary_label_, schema);
   StoreMapping(std::move(id_to_name));
 }
 
 Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
              std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
-             std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config,
-             std::unordered_map<uint64_t, std::string> id_to_name)
+             std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
+             const std::unordered_map<uint64_t, std::string> &id_to_name)
     : primary_label_{primary_label},
       min_primary_key_{min_primary_key},
       max_primary_key_{max_primary_key},
@@ -353,15 +354,16 @@ Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<Pr
       isolation_level_{config.transaction.isolation_level},
       config_{config},
       start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)),
-      shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
+      shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema,
+                      name_id_mapper_) {
   CreateSchema(primary_label_, schema);
-  StoreMapping(std::move(id_to_name));
+  StoreMapping(id_to_name);
 }
 
 Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
              std::vector<SchemaProperty> schema, VertexContainer &&vertices,
-             std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config,
-             std::unordered_map<uint64_t, std::string> id_to_name)
+             std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
+             const std::unordered_map<uint64_t, std::string> &id_to_name)
     : primary_label_{primary_label},
       min_primary_key_{min_primary_key},
       max_primary_key_{max_primary_key},
@@ -372,13 +374,25 @@ Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<Pr
       isolation_level_{config.transaction.isolation_level},
       config_{config},
       start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)),
-      shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
+      shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema,
+                      name_id_mapper_) {
   CreateSchema(primary_label_, schema);
-  StoreMapping(std::move(id_to_name));
+  StoreMapping(id_to_name);
 }
 
 Shard::~Shard() {}
 
+std::unique_ptr<Shard> Shard::FromSplitData(SplitData &&split_data) {
+  if (split_data.config.items.properties_on_edges) [[likely]] {
+    return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.min_primary_key,
+                                   split_data.schema, std::move(split_data.vertices), std::move(*split_data.edges),
+                                   std::move(split_data.transactions), split_data.config, split_data.id_to_name);
+  }
+  return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.min_primary_key,
+                                 split_data.schema, std::move(split_data.vertices), std::move(split_data.transactions),
+                                 split_data.config, split_data.id_to_name);
+}
+
 Shard::Accessor::Accessor(Shard &shard, Transaction &transaction)
     : shard_(&shard), transaction_(&transaction), config_(shard_->config_.items) {}
 
@@ -1096,7 +1110,7 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
   return std::nullopt;
 }
 
-std::unique_ptr<Shard> Shard::PerformSplit(const PrimaryKey &split_key) {
+SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
   return shard_splitter_.SplitShard(split_key, max_primary_key_);
 }
 
diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp
index 146320b7b..02ba7a2da 100644
--- a/src/storage/v3/shard.hpp
+++ b/src/storage/v3/shard.hpp
@@ -199,13 +199,13 @@ class Shard final {
 
   Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
         std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
-        std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config = Config(),
-        std::unordered_map<uint64_t, std::string> id_to_name = {});
+        std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
+        const std::unordered_map<uint64_t, std::string> &id_to_name);
 
   Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
         std::vector<SchemaProperty> schema, VertexContainer &&vertices,
-        std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config = Config(),
-        std::unordered_map<uint64_t, std::string> id_to_name = {});
+        std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
+        const std::unordered_map<uint64_t, std::string> &id_to_name);
 
   Shard(const Shard &) = delete;
   Shard(Shard &&) noexcept = delete;
@@ -213,6 +213,8 @@ class Shard final {
   Shard operator=(Shard &&) noexcept = delete;
   ~Shard();
 
+  static std::unique_ptr<Shard> FromSplitData(SplitData &&split_data);
+
   class Accessor final {
    private:
     friend class Shard;
@@ -379,7 +381,7 @@ class Shard final {
 
   std::optional<SplitInfo> ShouldSplit() const noexcept;
 
-  std::unique_ptr<Shard> PerformSplit(const PrimaryKey &split_key);
+  SplitData PerformSplit(const PrimaryKey &split_key);
 
  private:
   Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp
index b9d6304e1..5cbc84f50 100644
--- a/src/storage/v3/splitter.cpp
+++ b/src/storage/v3/splitter.cpp
@@ -20,6 +20,7 @@
 #include "storage/v3/id_types.hpp"
 #include "storage/v3/indices.hpp"
 #include "storage/v3/key_store.hpp"
+#include "storage/v3/name_id_mapper.hpp"
 #include "storage/v3/schemas.hpp"
 #include "storage/v3/shard.hpp"
 #include "storage/v3/transaction.hpp"
@@ -29,30 +30,37 @@ namespace memgraph::storage::v3 {
 
 Splitter::Splitter(const LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
                    std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
-                   Config &config, const std::vector<SchemaProperty> &schema)
+                   const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper)
     : primary_label_(primary_label),
       vertices_(vertices),
       edges_(edges),
       start_logical_id_to_transaction_(start_logical_id_to_transaction),
       indices_(indices),
       config_(config),
-      schema_(schema) {}
+      schema_(schema),
+      name_id_mapper_(name_id_mapper) {}
 
-std::unique_ptr<Shard> Splitter::SplitShard(const PrimaryKey &split_key,
-                                            const std::optional<PrimaryKey> &max_primary_key) {
-  SplitData data;
+SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key) {
+  SplitData data{.primary_label = primary_label_,
+                 .min_primary_key = split_key,
+                 .max_primary_key = max_primary_key,
+                 .schema = schema_,
+                 .config = config_,
+                 .id_to_name = name_id_mapper_.GetIdToNameMap()};
 
   std::set<uint64_t> collected_transactions_;
   data.vertices = CollectVertices(data, collected_transactions_, split_key);
   data.edges = CollectEdges(collected_transactions_, data.vertices, split_key);
   data.transactions = CollectTransactions(collected_transactions_, data.vertices, *data.edges);
 
-  if (data.edges) {
-    return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
-                                   std::move(*data.edges), std::move(data.transactions), config_);
-  }
-  return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
-                                 std::move(data.transactions), config_);
+  // if (data.edges) {
+  //   return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
+  //                                  std::move(*data.edges), std::move(data.transactions), config_,
+  //                                  name_id_mapper_.GetIdToNameMap());
+  // }
+  // return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
+  //                                std::move(data.transactions), config_, name_id_mapper_.GetIdToNameMap());
+  return data;
 }
 
 void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_, Delta *delta) {
diff --git a/src/storage/v3/splitter.hpp b/src/storage/v3/splitter.hpp
index 9822aa8f0..c8fc3fcd2 100644
--- a/src/storage/v3/splitter.hpp
+++ b/src/storage/v3/splitter.hpp
@@ -21,6 +21,7 @@
 #include "storage/v3/id_types.hpp"
 #include "storage/v3/indices.hpp"
 #include "storage/v3/key_store.hpp"
+#include "storage/v3/name_id_mapper.hpp"
 #include "storage/v3/schemas.hpp"
 #include "storage/v3/transaction.hpp"
 #include "storage/v3/vertex.hpp"
@@ -31,6 +32,13 @@ namespace memgraph::storage::v3 {
 // If edge properties-on-edges is false then we don't need to send edges but
 // only vertices, since they will contain those edges
 struct SplitData {
+  LabelId primary_label;
+  PrimaryKey min_primary_key;
+  std::optional<PrimaryKey> max_primary_key;
+  std::vector<SchemaProperty> schema;
+  Config config;
+  std::unordered_map<uint64_t, std::string> id_to_name;
+
   VertexContainer vertices;
   std::optional<EdgeContainer> edges;
   std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
@@ -42,7 +50,7 @@ class Splitter final {
  public:
   Splitter(LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
            std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
-           Config &config, const std::vector<SchemaProperty> &schema);
+           const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper_);
 
   Splitter(const Splitter &) = delete;
   Splitter(Splitter &&) noexcept = delete;
@@ -50,19 +58,19 @@ class Splitter final {
   Splitter operator=(Splitter &&) noexcept = delete;
   ~Splitter() = default;
 
-  std::unique_ptr<Shard> SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key);
+  SplitData SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key);
 
  private:
-  std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
-      const std::set<uint64_t> &collected_transactions_start_id, VertexContainer &cloned_vertices,
-      EdgeContainer &cloned_edges);
-
   VertexContainer CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
                                   const PrimaryKey &split_key);
 
   std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
                                             const VertexContainer &split_vertices, const PrimaryKey &split_key);
 
+  std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
+      const std::set<uint64_t> &collected_transactions_start_id, VertexContainer &cloned_vertices,
+      EdgeContainer &cloned_edges);
+
   template <typename IndexMap, typename IndexType>
   requires utils::SameAsAnyOf<IndexMap, LabelPropertyIndex, LabelIndex>
       std::map<IndexType, typename IndexMap::IndexContainer> CollectIndexEntries(
@@ -97,13 +105,14 @@ class Splitter final {
   void AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
                                 VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
 
-  LabelId primary_label_;
+  const LabelId primary_label_;
   VertexContainer &vertices_;
   EdgeContainer &edges_;
   std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_;
   Indices &indices_;
-  Config &config_;
-  std::vector<SchemaProperty> schema_;
+  const Config &config_;
+  const std::vector<SchemaProperty> schema_;
+  const NameIdMapper &name_id_mapper_;
 };
 
 }  // namespace memgraph::storage::v3
diff --git a/tests/unit/storage_v3_shard_split.cpp b/tests/unit/storage_v3_shard_split.cpp
index 6c98fa924..f1c52dad6 100644
--- a/tests/unit/storage_v3_shard_split.cpp
+++ b/tests/unit/storage_v3_shard_split.cpp
@@ -56,31 +56,29 @@ class ShardSplitTest : public testing::Test {
   }
 };
 
-void AssertEqVertexContainer(const VertexContainer &expected, const VertexContainer &actual) {
-  ASSERT_EQ(expected.size(), actual.size());
+void AssertEqVertexContainer(const VertexContainer &actual, const VertexContainer &expected) {
+  ASSERT_EQ(actual.size(), expected.size());
 
   auto expected_it = expected.begin();
   auto actual_it = actual.begin();
   while (expected_it != expected.end()) {
-    EXPECT_EQ(expected_it->first, actual_it->first);
-    EXPECT_EQ(expected_it->second.deleted, actual_it->second.deleted);
-    EXPECT_EQ(expected_it->second.in_edges, actual_it->second.in_edges);
-    EXPECT_EQ(expected_it->second.out_edges, actual_it->second.out_edges);
-    EXPECT_EQ(expected_it->second.labels, actual_it->second.labels);
+    EXPECT_EQ(actual_it->first, expected_it->first);
+    EXPECT_EQ(actual_it->second.deleted, expected_it->second.deleted);
+    EXPECT_EQ(actual_it->second.labels, expected_it->second.labels);
 
     auto *expected_delta = expected_it->second.delta;
     auto *actual_delta = actual_it->second.delta;
     while (expected_delta != nullptr) {
-      EXPECT_EQ(expected_delta->action, actual_delta->action);
+      EXPECT_EQ(actual_delta->action, expected_delta->action);
       switch (expected_delta->action) {
         case Delta::Action::ADD_LABEL:
         case Delta::Action::REMOVE_LABEL: {
-          EXPECT_EQ(expected_delta->label, actual_delta->label);
+          EXPECT_EQ(actual_delta->label, expected_delta->label);
           break;
         }
         case Delta::Action::SET_PROPERTY: {
-          EXPECT_EQ(expected_delta->property.key, actual_delta->property.key);
-          EXPECT_EQ(expected_delta->property.value, actual_delta->property.value);
+          EXPECT_EQ(actual_delta->property.key, expected_delta->property.key);
+          EXPECT_EQ(actual_delta->property.value, expected_delta->property.value);
           break;
         }
         case Delta::Action::ADD_IN_EDGE:
@@ -147,7 +145,7 @@ TEST_F(ShardSplitTest, TestBasicSplitWithVertices) {
   AddDeltaToDeltaChain(&*it, &delta_add_property);
   AddDeltaToDeltaChain(&*it, &delta_add_label);
 
-  AssertEqVertexContainer(expected_vertices, splitted_data.vertices);
+  AssertEqVertexContainer(splitted_data.vertices, expected_vertices);
 }
 
 TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
@@ -159,9 +157,6 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
   EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
   EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
 
-  EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
-                              VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
-                   .HasError());
   EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
                               VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
                    .HasError());
@@ -169,7 +164,8 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
                               VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
                    .HasError());
 
-  acc.Commit(GetNextHlc());
+  auto current_hlc = GetNextHlc();
+  acc.Commit(current_hlc);
 
   auto splitted_data = storage.PerformSplit({PropertyValue(4)});
   EXPECT_EQ(splitted_data.vertices.size(), 3);
@@ -177,6 +173,26 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
   EXPECT_EQ(splitted_data.transactions.size(), 1);
   EXPECT_EQ(splitted_data.label_indices.size(), 0);
   EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
+
+  CommitInfo commit_info{.start_or_commit_timestamp = current_hlc};
+  Delta delta_delete1{Delta::DeleteObjectTag{}, &commit_info, 1};
+  Delta delta_delete2{Delta::DeleteObjectTag{}, &commit_info, 1};
+  Delta delta_delete3{Delta::DeleteObjectTag{}, &commit_info, 1};
+  Delta delta_add_in_edge1{Delta::RemoveInEdgeTag{},  edge_type_id, VertexId{primary_label, {PropertyValue(1)}},
+                           EdgeRef{Gid::FromUint(1)}, &commit_info, 1};
+  Delta delta_add_out_edge2{Delta::RemoveOutEdgeTag{}, edge_type_id, VertexId{primary_label, {PropertyValue(6)}},
+                            EdgeRef{Gid::FromUint(2)}, &commit_info, 1};
+  Delta delta_add_in_edge2{Delta::RemoveInEdgeTag{},  edge_type_id, VertexId{primary_label, {PropertyValue(4)}},
+                           EdgeRef{Gid::FromUint(2)}, &commit_info, 1};
+  VertexContainer expected_vertices;
+  auto [vtx4, inserted4] = expected_vertices.emplace(PrimaryKey{PropertyValue{4}}, VertexData(&delta_delete1));
+  auto [vtx5, inserted5] = expected_vertices.emplace(PrimaryKey{PropertyValue{5}}, VertexData(&delta_delete2));
+  auto [vtx6, inserted6] = expected_vertices.emplace(PrimaryKey{PropertyValue{6}}, VertexData(&delta_delete3));
+  AddDeltaToDeltaChain(&*vtx4, &delta_add_out_edge2);
+  AddDeltaToDeltaChain(&*vtx5, &delta_add_in_edge1);
+  AddDeltaToDeltaChain(&*vtx6, &delta_add_in_edge2);
+
+  AssertEqVertexContainer(splitted_data.vertices, expected_vertices);
 }
 
 TEST_F(ShardSplitTest, TestBasicSplitBeforeCommit) {