From a97d9945153a2af39e076ded76ade23c23ef92eb Mon Sep 17 00:00:00 2001
From: jbajic <jure.bajic@memgraph.com>
Date: Thu, 12 Jan 2023 16:18:16 +0100
Subject: [PATCH] Fix vertice split

---
 src/storage/v3/delta.hpp              |  5 +-
 src/storage/v3/shard.cpp              |  6 +-
 tests/unit/CMakeLists.txt             |  3 +
 tests/unit/storage_v3_shard_split.cpp | 87 +++++++++++++++++++++++++++
 4 files changed, 97 insertions(+), 4 deletions(-)
 create mode 100644 tests/unit/storage_v3_shard_split.cpp

diff --git a/src/storage/v3/delta.hpp b/src/storage/v3/delta.hpp
index 69da63e77..39c9975f6 100644
--- a/src/storage/v3/delta.hpp
+++ b/src/storage/v3/delta.hpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2023 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -133,7 +133,8 @@ inline bool operator!=(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer
 
 struct Delta {
   // Needed for splits
-  boost::uuids::uuid uuid;
+  // TODO Replace this with int identifier
+  boost::uuids::uuid uuid{boost::uuids::uuid()};
 
   enum class Action {
     // Used for both Vertex and Edge
diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp
index 856906385..bf89ed58c 100644
--- a/src/storage/v3/shard.cpp
+++ b/src/storage/v3/shard.cpp
@@ -1066,12 +1066,14 @@ void Shard::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delt
 VertexContainer Shard::CollectVertices(std::set<uint64_t> &collected_transactions_start_id,
                                        const PrimaryKey &split_key) {
   VertexContainer splitted_data;
-  auto split_key_it = vertices_.find(split_key);
 
-  for (; split_key_it != vertices_.end(); split_key_it++) {
+  auto split_key_it = vertices_.find(split_key);
+  while (split_key_it != vertices_.end()) {
     // Go through deltas and pick up transactions start_id
     ScanDeltas(collected_transactions_start_id, split_key_it->second.delta);
+    auto next_it = std::next(split_key_it);
     splitted_data.insert(vertices_.extract(split_key_it->first));
+    split_key_it = next_it;
   }
   return splitted_data;
 }
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index 5bfa26afd..cc23a3e5e 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -291,6 +291,9 @@ target_link_libraries(${test_prefix}storage_v3_expr mg-storage-v3 mg-expr)
 add_unit_test(storage_v3_schema.cpp)
 target_link_libraries(${test_prefix}storage_v3_schema mg-storage-v3)
 
+add_unit_test(storage_v3_shard_split.cpp)
+target_link_libraries(${test_prefix}storage_v3_shard_split mg-storage-v3  mg-query-v2)
+
 # Test mg-query-v2
 # These are commented out because of the new TypedValue in the query engine
 # add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp)
diff --git a/tests/unit/storage_v3_shard_split.cpp b/tests/unit/storage_v3_shard_split.cpp
new file mode 100644
index 000000000..d1bafb65c
--- /dev/null
+++ b/tests/unit/storage_v3_shard_split.cpp
@@ -0,0 +1,87 @@
+// Copyright 2023 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#include <gtest/gtest.h>
+#include <cstdint>
+
+#include "query/v2/requests.hpp"
+#include "storage/v3/id_types.hpp"
+#include "storage/v3/key_store.hpp"
+#include "storage/v3/property_value.hpp"
+#include "storage/v3/shard.hpp"
+#include "storage/v3/vertex_id.hpp"
+
+namespace memgraph::storage::v3::tests {
+
+class ShardSplitTest : public testing::Test {
+ protected:
+  void SetUp() override { storage.StoreMapping({{1, "label"}, {2, "property"}, {3, "edge_property"}}); }
+
+  const PropertyId primary_property{PropertyId::FromUint(2)};
+  std::vector<storage::v3::SchemaProperty> schema_property_vector = {
+      storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
+  const std::vector<PropertyValue> min_pk{PropertyValue{0}};
+  const LabelId primary_label{LabelId::FromUint(1)};
+  const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)};
+  Shard storage{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector};
+
+  coordinator::Hlc last_hlc{0, io::Time{}};
+
+  coordinator::Hlc GetNextHlc() {
+    ++last_hlc.logical_id;
+    last_hlc.coordinator_wall_clock += std::chrono::seconds(1);
+    return last_hlc;
+  }
+};
+
+TEST_F(ShardSplitTest, TestBasicSplitWithVertices) {
+  auto acc = storage.Access(GetNextHlc());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
+  acc.Commit(GetNextHlc());
+  storage.CollectGarbage(GetNextHlc().coordinator_wall_clock);
+
+  auto splitted_data = storage.PerformSplit({PropertyValue(4)});
+  EXPECT_EQ(splitted_data.vertices.size(), 3);
+  EXPECT_EQ(splitted_data.edges->size(), 0);
+  EXPECT_EQ(splitted_data.transactions.size(), 0);
+}
+
+TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
+  auto acc = storage.Access(GetNextHlc());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
+  EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
+
+  EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
+                              VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
+                   .HasError());
+  EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
+                              VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(0))
+                   .HasError());
+  EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
+                              VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(0))
+                   .HasError());
+
+  acc.Commit(GetNextHlc());
+  storage.CollectGarbage(GetNextHlc().coordinator_wall_clock);
+
+  auto splitted_data = storage.PerformSplit({PropertyValue(4)});
+}
+
+}  // namespace memgraph::storage::v3::tests