From 742393cd70021c5b6745f2686e6541da0d4c5e31 Mon Sep 17 00:00:00 2001
From: jbajic <jure.bajic@memgraph.com>
Date: Tue, 24 Jan 2023 20:49:04 +0100
Subject: [PATCH] Fix index split

---
 src/storage/v3/splitter.cpp           | 31 ++++++++++--------
 src/storage/v3/splitter.hpp           | 25 ++++++++++-----
 tests/unit/storage_v3_shard_split.cpp | 46 +++++++++++++++++++++++++++
 3 files changed, 80 insertions(+), 22 deletions(-)

diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp
index 5cbc84f50..24472f29d 100644
--- a/src/storage/v3/splitter.cpp
+++ b/src/storage/v3/splitter.cpp
@@ -25,6 +25,7 @@
 #include "storage/v3/shard.hpp"
 #include "storage/v3/transaction.hpp"
 #include "storage/v3/vertex.hpp"
+#include "utils/logging.hpp"
 
 namespace memgraph::storage::v3 {
 
@@ -53,13 +54,6 @@ SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<
   data.edges = CollectEdges(collected_transactions_, data.vertices, split_key);
   data.transactions = CollectTransactions(collected_transactions_, data.vertices, *data.edges);
 
-  // if (data.edges) {
-  //   return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
-  //                                  std::move(*data.edges), std::move(data.transactions), config_,
-  //                                  name_id_mapper_.GetIdToNameMap());
-  // }
-  // return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
-  //                                std::move(data.transactions), config_, name_id_mapper_.GetIdToNameMap());
   return data;
 }
 
@@ -74,18 +68,26 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
                                           const PrimaryKey &split_key) {
   // Collection of indices is here since it heavily depends on vertices
   // Old vertex pointer new entry pointer
-  std::map<LabelId, std::multimap<const Vertex *, LabelIndex::Entry *>> label_index_vertex_entry_map;
-  std::map<std::pair<LabelId, PropertyId>, std::multimap<const Vertex *, LabelPropertyIndex::Entry *>>
+
+  std::map<LabelId, std::multimap<const Vertex *, const LabelIndex::IndexContainer::iterator>>
+      label_index_vertex_entry_map;
+  std::map<std::pair<LabelId, PropertyId>,
+           std::multimap<const Vertex *, const LabelPropertyIndex::IndexContainer::iterator>>
       label_property_vertex_entry_map;
+
   data.label_indices =
       CollectIndexEntries<LabelIndex, LabelId>(indices_.label_index, split_key, label_index_vertex_entry_map);
   data.label_property_indices = CollectIndexEntries<LabelPropertyIndex, std::pair<LabelId, PropertyId>>(
       indices_.label_property_index, split_key, label_property_vertex_entry_map);
-  const auto update_indices = [](auto &index_map, const auto *old_vertex_ptr, auto &splitted_vertex_it) {
-    for (auto &[label, vertex_entry_mappings] : index_map) {
+  const auto update_indices = [](auto &entry_vertex_map, auto &updating_index, const auto *old_vertex_ptr,
+                                 auto &new_vertex_ptr) {
+    for ([[maybe_unused]] auto &[index_type, vertex_entry_mappings] : entry_vertex_map) {
       auto [it, end] = vertex_entry_mappings.equal_range(old_vertex_ptr);
       while (it != end) {
-        it->second->vertex = &*splitted_vertex_it;
+        auto entry_to_update = *it->second;
+        entry_to_update.vertex = &*new_vertex_ptr;
+        updating_index.at(index_type).erase(it->second);
+        updating_index.at(index_type).insert(std::move(entry_to_update));
         ++it;
       }
     }
@@ -101,10 +103,11 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
     auto next_it = std::next(split_key_it);
 
     const auto &[splitted_vertex_it, inserted, node] = splitted_data.insert(vertices_.extract(split_key_it->first));
+    MG_ASSERT(inserted, "Failed to extract vertex!");
 
     // Update indices
-    update_indices(label_index_vertex_entry_map, old_vertex_ptr, splitted_vertex_it);
-    update_indices(label_property_vertex_entry_map, old_vertex_ptr, splitted_vertex_it);
+    update_indices(label_index_vertex_entry_map, data.label_indices, old_vertex_ptr, splitted_vertex_it);
+    // update_indices(label_property_vertex_entry_map, old_vertex_ptr, splitted_vertex_it);
 
     split_key_it = next_it;
   }
diff --git a/src/storage/v3/splitter.hpp b/src/storage/v3/splitter.hpp
index c8fc3fcd2..cc9ca88fa 100644
--- a/src/storage/v3/splitter.hpp
+++ b/src/storage/v3/splitter.hpp
@@ -75,21 +75,30 @@ class Splitter final {
   requires utils::SameAsAnyOf<IndexMap, LabelPropertyIndex, LabelIndex>
       std::map<IndexType, typename IndexMap::IndexContainer> CollectIndexEntries(
           IndexMap &index, const PrimaryKey &split_key,
-          std::map<IndexType, std::multimap<const Vertex *, typename IndexMap::Entry *>> &vertex_entry_map) {
+          std::map<IndexType, std::multimap<const Vertex *, const typename IndexMap::IndexContainer::iterator>>
+              &vertex_entry_map) {
     if (index.Empty()) {
       return {};
     }
 
     std::map<IndexType, typename IndexMap::IndexContainer> cloned_indices;
-    for (auto &[label_prop_pair, index] : index.GetIndex()) {
-      cloned_indices[label_prop_pair] = typename IndexMap::IndexContainer{};
-      for (const auto &entry : index) {
-        if (entry.vertex->first > split_key) {
+    for (auto &[index_type_val, index] : index.GetIndex()) {
+      // cloned_indices[index_type_val] = typename IndexMap::IndexContainer{};
+
+      auto entry_it = index.begin();
+      while (entry_it != index.end()) {
+        // We need to save the next pointer since the current one will be
+        // invalidated after extract
+        auto next_entry_it = std::next(entry_it);
+        if (entry_it->vertex->first > split_key) {
           // We get this entry
-          [[maybe_unused]] const auto [it, inserted, node] =
-              cloned_indices[label_prop_pair].insert(index.extract(entry));
-          vertex_entry_map[label_prop_pair].insert({entry.vertex, &node.value()});
+          [[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
+              cloned_indices[index_type_val].insert(index.extract(entry_it));
+          MG_ASSERT(inserted, "Failed to extract index entry!");
+
+          vertex_entry_map[index_type_val].insert({inserted_entry_it->vertex, inserted_entry_it});
         }
+        entry_it = next_entry_it;
       }
     }
 
diff --git a/tests/unit/storage_v3_shard_split.cpp b/tests/unit/storage_v3_shard_split.cpp
index f1c52dad6..3afda33ad 100644
--- a/tests/unit/storage_v3_shard_split.cpp
+++ b/tests/unit/storage_v3_shard_split.cpp
@@ -253,4 +253,50 @@ TEST_F(ShardSplitTest, TestBasicSplitWithCommitedAndOngoingTransactions) {
   EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
 }
 
+TEST_F(ShardSplitTest, TestBasicSplitWithLabelIndex) {
+  auto acc = storage.Access(GetNextHlc());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(6)}, {}).HasError());
+  acc.Commit(GetNextHlc());
+  storage.CreateIndex(secondary_label);
+
+  auto splitted_data = storage.PerformSplit({PropertyValue(4)});
+
+  EXPECT_EQ(splitted_data.vertices.size(), 3);
+  EXPECT_EQ(splitted_data.edges->size(), 0);
+  EXPECT_EQ(splitted_data.transactions.size(), 1);
+  EXPECT_EQ(splitted_data.label_indices.size(), 1);
+  EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
+}
+
+TEST_F(ShardSplitTest, TestBasicSplitWithLabelPropertyIndex) {
+  auto acc = storage.Access(GetNextHlc());
+  EXPECT_FALSE(
+      acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {{secondary_property, PropertyValue(1)}})
+          .HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
+  EXPECT_FALSE(
+      acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {{secondary_property, PropertyValue(21)}})
+          .HasError());
+  EXPECT_FALSE(
+      acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(6)}, {{secondary_property, PropertyValue(22)}})
+          .HasError());
+  acc.Commit(GetNextHlc());
+  storage.CreateIndex(secondary_label, secondary_property);
+
+  auto splitted_data = storage.PerformSplit({PropertyValue(4)});
+
+  EXPECT_EQ(splitted_data.vertices.size(), 3);
+  EXPECT_EQ(splitted_data.edges->size(), 0);
+  EXPECT_EQ(splitted_data.transactions.size(), 1);
+  EXPECT_EQ(splitted_data.label_indices.size(), 0);
+  EXPECT_EQ(splitted_data.label_property_indices.size(), 1);
+}
+
 }  // namespace memgraph::storage::v3::tests