From 39cfcca2e5e1dfc158b7d6ccee2355f4afe496d7 Mon Sep 17 00:00:00 2001
From: jbajic <jure.bajic@memgraph.com>
Date: Fri, 17 Mar 2023 11:16:46 +0100
Subject: [PATCH] Fix ASAN bug

---
 src/storage/v3/splitter.cpp | 110 ++++++++++++++++++++++++++++++++----
 src/storage/v3/splitter.hpp |   3 +
 2 files changed, 103 insertions(+), 10 deletions(-)

diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp
index e7de81f79..0bd69d8c5 100644
--- a/src/storage/v3/splitter.cpp
+++ b/src/storage/v3/splitter.cpp
@@ -145,8 +145,10 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
   // Remove delta chains that don't point to objects on splitted shard
   auto cloned_delta_it = cloned_transaction.deltas.begin();
 
-  const auto remove_from_delta_chain = [&cloned_transaction, &cloned_transactions](auto &cloned_delta_it) {
+  // Erases the delta chain
+  const auto erase_delta_chain = [&cloned_transaction, &cloned_transactions](auto &cloned_delta_it) {
     auto *current_next_delta = cloned_delta_it->next;
+    // We need to keep track of cloned_delta_it in the delta list of current transaction
     cloned_delta_it = cloned_transaction.deltas.erase(cloned_delta_it);
 
     while (current_next_delta != nullptr) {
@@ -160,9 +162,21 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
                    transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
           });
       MG_ASSERT(current_transaction_it != cloned_transactions.end(), "Error when pruning deltas!");
-      // Remove it
-      current_transaction_it->second->deltas.remove_if(
-          [&current_next_delta = *current_next_delta](const auto &delta) { return delta == current_next_delta; });
+      // Remove the delta
+      const auto delta_it =
+          std::ranges::find_if(current_transaction_it->second->deltas,
+                               [current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; });
+      if (delta_it != current_transaction_it->second->deltas.end()) {
+        // If the next delta is next in transaction list replace current_transaction_it
+        // with the next one
+        if (current_transaction_it->second->start_timestamp == cloned_transaction.start_timestamp &&
+            current_transaction_it == std::next(current_transaction_it)) {
+          // TODO Dont do this if the delta_it is not next in line in transaction list
+          cloned_delta_it = current_transaction_it->second->deltas.erase(delta_it);
+        } else {
+          current_transaction_it->second->deltas.erase(delta_it);
+        }
+      }
 
       current_next_delta = next_delta;
     }
@@ -178,7 +192,7 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
       case PreviousPtr::Type::VERTEX: {
         if (prev.vertex->first < split_key) {
           // We can remove this delta chain
-          remove_from_delta_chain(cloned_delta_it);
+          erase_delta_chain(cloned_delta_it);
         } else {
           ++cloned_delta_it;
         }
@@ -187,11 +201,81 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
       case PreviousPtr::Type::EDGE: {
         if (const auto edge_gid = prev.edge->gid; !cloned_edges.contains(edge_gid)) {
           // We can remove this delta chain
-          remove_from_delta_chain(cloned_delta_it);
+          erase_delta_chain(cloned_delta_it);
         } else {
           ++cloned_delta_it;
-          break;
         }
+        break;
+      }
+    }
+  }
+}
+
+void Splitter::PruneOriginalDeltas(Transaction &transaction,
+                                   std::map<uint64_t, std::unique_ptr<Transaction>> &transactions,
+                                   const PrimaryKey &split_key) {
+  // Remove delta chains that don't point to objects on splitted shard
+  auto delta_it = transaction.deltas.begin();
+
+  const auto erase_delta_chain = [&transaction, &transactions](auto &cloned_delta_it) {
+    auto *current_next_delta = cloned_delta_it->next;
+    // We need to keep track of cloned_delta_it in the delta list of current transaction
+    cloned_delta_it = transaction.deltas.erase(cloned_delta_it);
+
+    while (current_next_delta != nullptr) {
+      auto *next_delta = current_next_delta->next;
+      // Find next delta transaction delta list
+      auto current_transaction_it = std::ranges::find_if(
+          transactions, [&start_or_commit_timestamp =
+                             current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) {
+            return transaction.second->start_timestamp == start_or_commit_timestamp ||
+                   transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
+          });
+      MG_ASSERT(current_transaction_it != transactions.end(), "Error when pruning deltas!");
+      // Remove the delta
+      const auto delta_it =
+          std::ranges::find_if(current_transaction_it->second->deltas,
+                               [current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; });
+      if (delta_it != current_transaction_it->second->deltas.end()) {
+        // If the next delta is next in transaction list replace current_transaction_it
+        // with the next one
+        if (current_transaction_it->second->start_timestamp == transaction.start_timestamp &&
+            current_transaction_it == std::next(current_transaction_it)) {
+          // TODO Dont do this if the delta_it is not next in line in transaction list
+          cloned_delta_it = current_transaction_it->second->deltas.erase(delta_it);
+        } else {
+          current_transaction_it->second->deltas.erase(delta_it);
+        }
+      }
+
+      current_next_delta = next_delta;
+    }
+  };
+
+  while (delta_it != transaction.deltas.end()) {
+    const auto prev = delta_it->prev.Get();
+    switch (prev.type) {
+      case PreviousPtr::Type::DELTA:
+      case PreviousPtr::Type::NULLPTR:
+        ++delta_it;
+        break;
+      case PreviousPtr::Type::VERTEX: {
+        if (prev.vertex->first >= split_key) {
+          // We can remove this delta chain
+          erase_delta_chain(delta_it);
+        } else {
+          ++delta_it;
+        }
+        break;
+      }
+      case PreviousPtr::Type::EDGE: {
+        if (const auto edge_gid = prev.edge->gid; !edges_.contains(edge_gid)) {
+          // We can remove this delta chain
+          erase_delta_chain(delta_it);
+        } else {
+          ++delta_it;
+        }
+        break;
       }
     }
   }
@@ -200,15 +284,19 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
 void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
                                         VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
                                         const PrimaryKey &split_key) {
-  for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
-    AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
+  for (auto &[start_id, cloned_transaction] : cloned_transactions) {
+    AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[start_id], cloned_transactions,
                             cloned_vertices, cloned_edges, split_key);
   }
   // Prune deltas whose delta chain points to vertex/edge that should not belong on that shard
   // Prune must be after adjust, since next, and prev are not set and we cannot follow the chain
-  for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
+  for (auto &[start_id, cloned_transaction] : cloned_transactions) {
     PruneDeltas(*cloned_transaction, cloned_transactions, split_key, cloned_edges);
   }
+  // Also we need to remove deltas from original transactions
+  for (auto &[start_id, original_transaction] : start_logical_id_to_transaction_) {
+    PruneOriginalDeltas(*original_transaction, start_logical_id_to_transaction_, split_key);
+  }
 }
 
 inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) {
@@ -350,11 +438,13 @@ void Splitter::AdjustDeltaPrevPtr(const Delta &original, Delta &cloned,
     case PreviousPtr::Type::VERTEX: {
       // The vertex was extracted and it is safe to reuse address
       cloned.prev.Set(ptr.vertex);
+      ptr.vertex->second.delta = &cloned;
       break;
     }
     case PreviousPtr::Type::EDGE: {
       // We can never be here if we have properties on edge disabled
       auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
+      ptr.edge->delta = &cloned;
       cloned.prev.Set(&cloned_edge->second);
       break;
     }
diff --git a/src/storage/v3/splitter.hpp b/src/storage/v3/splitter.hpp
index 5e5867127..72869e9cb 100644
--- a/src/storage/v3/splitter.hpp
+++ b/src/storage/v3/splitter.hpp
@@ -77,6 +77,9 @@ class Splitter final {
 
   static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, const Delta *delta);
 
+  void PruneOriginalDeltas(Transaction &transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &transactions,
+                           const PrimaryKey &split_key);
+
   void AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
                                std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
                                VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,