From 5f509532f235afcd42b18c4fb9134d844513b176 Mon Sep 17 00:00:00 2001
From: Aidar Samerkhanov <aidar.samerkhanov@memgraph.io>
Date: Mon, 28 Aug 2023 09:56:17 +0300
Subject: [PATCH] Add timestamp to DELETE_DESERIALIZED_OBJECT delta at which
 this object was created. (#1179)

Add timestamp to DELETE_DESERIALIZED_OBJECT delta at which this object was created.
RocksDB currently doesn't provide timestamp() functionality in iterators of TransationDB.
Because of that we are using constant "0" timestamp for DELETE_DESERIALIZED_OBJECT.
---
 src/storage/v2/delta.hpp        | 13 +++--
 src/storage/v2/disk/storage.cpp | 84 ++++++++++++++++++++++-----------
 src/storage/v2/disk/storage.hpp | 10 ++--
 src/storage/v2/mvcc.hpp         | 19 +++++---
 src/utils/disk_utils.hpp        |  8 ++++
 5 files changed, 93 insertions(+), 41 deletions(-)

diff --git a/src/storage/v2/delta.hpp b/src/storage/v2/delta.hpp
index 7af335c31..66d7c37f4 100644
--- a/src/storage/v2/delta.hpp
+++ b/src/storage/v2/delta.hpp
@@ -154,9 +154,14 @@ struct Delta {
   struct RemoveInEdgeTag {};
   struct RemoveOutEdgeTag {};
 
-  Delta(DeleteDeserializedObjectTag /*tag*/, std::atomic<uint64_t> *timestamp,
-        const std::optional<std::string> &old_disk_key)
-      : action(Action::DELETE_DESERIALIZED_OBJECT), timestamp(timestamp), command_id(0), old_disk_key(old_disk_key) {}
+  // DELETE_DESERIALIZED_OBJECT is used to load data from disk committed by past txs.
+  // Because of this object was created in past txs, we create timestamp by ourselves inside instead of having it from
+  // current tx. This timestamp we got from RocksDB timestamp stored in key.
+  Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, const std::optional<std::string> &old_disk_key)
+      : action(Action::DELETE_DESERIALIZED_OBJECT),
+        timestamp(new std::atomic<uint64_t>(ts)),
+        command_id(0),
+        old_disk_key(old_disk_key) {}
 
   Delta(DeleteObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
       : action(Action::DELETE_OBJECT), timestamp(timestamp), command_id(command_id) {}
@@ -224,6 +229,8 @@ struct Delta {
         break;
       case Action::DELETE_DESERIALIZED_OBJECT:
         old_disk_key.reset();
+        delete timestamp;
+        timestamp = nullptr;
         break;
       case Action::SET_PROPERTY:
         property.value.~PropertyValue();
diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp
index b375ec096..a6910fa62 100644
--- a/src/storage/v2/disk/storage.cpp
+++ b/src/storage/v2/disk/storage.cpp
@@ -12,6 +12,7 @@
 #include <limits>
 #include <optional>
 #include <stdexcept>
+#include <string>
 #include <string_view>
 #include <vector>
 
@@ -59,6 +60,7 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
 
 namespace {
 
+constexpr const char *deserializeTimestamp = "0";
 constexpr const char *vertexHandle = "vertex";
 constexpr const char *edgeHandle = "edge";
 constexpr const char *defaultHandle = "default";
@@ -311,8 +313,9 @@ DiskStorage::DiskAccessor::~DiskAccessor() {
 }
 
 /// NOTE: This will create Delta object which will cause deletion of old key entry on the disk
-std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(std::string &&key,
-                                                                                              std::string &&value) {
+std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(const std::string &key,
+                                                                                              const std::string &value,
+                                                                                              const std::string &ts) {
   auto main_storage_accessor = vertices_.access();
 
   storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key)));
@@ -322,7 +325,7 @@ std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMa
   std::vector<LabelId> labels_id{utils::DeserializeLabelsFromMainDiskStorage(key)};
   PropertyStore properties{utils::DeserializePropertiesFromMainDiskStorage(value)};
   return CreateVertex(main_storage_accessor, gid, std::move(labels_id), std::move(properties),
-                      CreateDeleteDeserializedObjectDelta(&transaction_, key));
+                      CreateDeleteDeserializedObjectDelta(&transaction_, key, ts));
 }
 
 std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelIndexCache(
@@ -354,7 +357,8 @@ std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLa
 }
 
 std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const rocksdb::Slice &key,
-                                                                       const rocksdb::Slice &value) {
+                                                                       const rocksdb::Slice &value,
+                                                                       const rocksdb::Slice &ts) {
   const auto edge_parts = utils::Split(key.ToStringView(), "|");
   const Gid edge_gid = Gid::FromUint(std::stoull(edge_parts[4]));
 
@@ -380,7 +384,8 @@ std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const roc
     throw utils::BasicException("Non-existing vertices found during edge deserialization");
   }
   const auto edge_type_id = storage::EdgeTypeId::FromUint(std::stoull(edge_parts[3]));
-  auto maybe_edge = CreateEdge(&*from_acc, &*to_acc, edge_type_id, edge_gid, value.ToStringView(), key.ToString());
+  auto maybe_edge =
+      CreateEdge(&*from_acc, &*to_acc, edge_type_id, edge_gid, value.ToStringView(), key.ToString(), ts.ToString());
   MG_ASSERT(maybe_edge.HasValue());
 
   return *maybe_edge;
@@ -399,7 +404,9 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) {
   auto it =
       std::unique_ptr<rocksdb::Iterator>(disk_transaction_->GetIterator(ro, disk_storage->kvstore_->vertex_chandle));
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    LoadVertexToMainMemoryCache(it->key().ToString(), it->value().ToString());
+    // We should pass it->timestamp().ToString() instead of deserializeTimestamp
+    // This is hack until RocksDB will support timestamp() in WBWI iterator
+    LoadVertexToMainMemoryCache(it->key().ToString(), it->value().ToString(), deserializeTimestamp);
   }
   scanned_all_vertices_ = true;
   return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_,
@@ -430,11 +437,13 @@ std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWit
     if (VertexHasLabel(vertex, label, &transaction_, view)) {
       spdlog::trace("Loaded vertex with gid: {} from main index storage to label index",
                     utils::SerializeIdType(vertex.gid));
+      uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
       /// TODO: here are doing serialization and then later deserialization again -> expensive
-      LoadVertexToLabelIndexCache(label, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
-                                  utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
-                                  CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
-                                  indexed_vertices->access());
+      LoadVertexToLabelIndexCache(
+          label, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
+          utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
+          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts),
+          indexed_vertices->access());
     }
   }
   return gids;
@@ -460,9 +469,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelIndex(LabelId label,
     Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key)));
     spdlog::trace("Loaded vertex with key: {} from label index storage", key);
     if (key.starts_with(serialized_label) && !utils::Contains(gids, curr_gid)) {
-      LoadVertexToLabelIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
-                                  CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
-                                  indexed_vertices->access());
+      // We should pass it->timestamp().ToString() instead of deserializeTimestamp
+      // This is hack until RocksDB will support timestamp() in WBWI iterator
+      LoadVertexToLabelIndexCache(
+          label, index_it->key().ToString(), index_it->value().ToString(),
+          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp),
+          indexed_vertices->access());
     }
   }
 }
@@ -505,10 +517,11 @@ std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWit
     gids.insert(vertex.gid);
     /// TODO: delta support for clearing old disk keys
     if (label_property_filter(vertex, label, property, view)) {
+      uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
       LoadVertexToLabelPropertyIndexCache(
           label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
           utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
-          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
+          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts),
           indexed_vertices->access());
     }
   }
@@ -538,9 +551,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndex(LabelId l
     Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
     /// TODO: optimize
     if (label_property_filter(key, label_property_prefix, gids, curr_gid)) {
-      LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
-                                          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
-                                          indexed_vertices->access());
+      // We should pass it->timestamp().ToString() instead of deserializeTimestamp
+      // This is hack until RocksDB will support timestamp() in WBWI iterator
+      LoadVertexToLabelPropertyIndexCache(
+          label, index_it->key().ToString(), index_it->value().ToString(),
+          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp),
+          indexed_vertices->access());
     }
   }
 }
@@ -590,9 +606,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexWithPointV
     PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value);
     if (key.starts_with(label_property_prefix) && !utils::Contains(gids, curr_gid) &&
         properties.IsPropertyEqual(property, value)) {
-      LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
-                                          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
-                                          indexed_vertices->access());
+      // We should pass it->timestamp().ToString() instead of deserializeTimestamp
+      // This is hack until RocksDB will support timestamp() in WBWI iterator
+      LoadVertexToLabelPropertyIndexCache(
+          label, index_it->key().ToString(), index_it->value().ToString(),
+          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp),
+          indexed_vertices->access());
     }
   }
 }
@@ -630,10 +649,11 @@ DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCache
     auto prop_value = GetVertexProperty(vertex, property, &transaction_, view);
     if (VertexHasLabel(vertex, label, &transaction_, view) &&
         IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
+      uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
       LoadVertexToLabelPropertyIndexCache(
           label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
           utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
-          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
+          CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts),
           indexed_vertices->access());
     }
   }
@@ -669,9 +689,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexForInterva
         !IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
       continue;
     }
-    LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
-                                        CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str),
-                                        indexed_vertices->access());
+    // We should pass it->timestamp().ToString() instead of deserializeTimestamp
+    // This is hack until RocksDB will support timestamp() in WBWI iterator
+    LoadVertexToLabelPropertyIndexCache(
+        label, index_it->key().ToString(), index_it->value().ToString(),
+        CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str, deserializeTimestamp),
+        indexed_vertices->access());
   }
 }
 
@@ -841,7 +864,9 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     std::string key = it->key().ToString();
     if (Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key))) == gid) {
-      return LoadVertexToMainMemoryCache(std::move(key), it->value().ToString());
+      // We should pass it->timestamp().ToString() instead of deserializeTimestamp
+      // This is hack until RocksDB will support timestamp() in WBWI iterator
+      return LoadVertexToMainMemoryCache(key, it->value().ToString(), deserializeTimestamp);
     }
   }
   return std::nullopt;
@@ -955,7 +980,9 @@ void DiskStorage::DiskAccessor::PrefetchEdges(const VertexAccessor &vertex_acc,
     const rocksdb::Slice &key = it->key();
     auto keyStr = key.ToStringView();
     if (PrefetchEdgeFilter(keyStr, vertex_acc, edge_direction)) {
-      DeserializeEdge(key, it->value());
+      // We should pass it->timestamp().ToString() instead of deserializeTimestamp
+      // This is hack until RocksDB will support timestamp() in WBWI iterator
+      DeserializeEdge(key, it->value(), deserializeTimestamp);
     }
   }
 }
@@ -971,7 +998,8 @@ void DiskStorage::DiskAccessor::PrefetchOutEdges(const VertexAccessor &vertex_ac
 Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(const VertexAccessor *from, const VertexAccessor *to,
                                                            EdgeTypeId edge_type, storage::Gid gid,
                                                            const std::string_view properties,
-                                                           const std::string &old_disk_key) {
+                                                           const std::string &old_disk_key,
+                                                           const std::string &read_ts) {
   OOMExceptionEnabler oom_exception;
   auto *from_vertex = from->vertex_;
   auto *to_vertex = to->vertex_;
@@ -985,7 +1013,7 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(const VertexAccessor
   EdgeRef edge(gid);
   if (config_.properties_on_edges) {
     auto acc = edges_.access();
-    auto *delta = CreateDeleteDeserializedObjectDelta(&transaction_, old_disk_key);
+    auto *delta = CreateDeleteDeserializedObjectDelta(&transaction_, old_disk_key, read_ts);
     auto [it, inserted] = acc.insert(Edge(gid, delta));
     MG_ASSERT(inserted, "The edge must be inserted here!");
     MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp
index 6418ce3f2..601761ca4 100644
--- a/src/storage/v2/disk/storage.hpp
+++ b/src/storage/v2/disk/storage.hpp
@@ -22,6 +22,7 @@
 #include "utils/rw_lock.hpp"
 
 #include <rocksdb/db.h>
+#include <rocksdb/slice.h>
 #include <unordered_set>
 
 namespace memgraph::storage {
@@ -200,13 +201,15 @@ class DiskStorage final : public Storage {
         LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
         utils::SkipList<storage::Vertex>::Accessor index_accessor);
 
-    std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(std::string &&key, std::string &&value);
+    std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(const std::string &key, const std::string &value,
+                                                                       const std::string &ts);
 
     std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
         LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
         utils::SkipList<storage::Vertex>::Accessor index_accessor);
 
-    std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value);
+    std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value,
+                                                         const rocksdb::Slice &ts);
 
    private:
     VertexAccessor CreateVertex(utils::SkipList<Vertex>::Accessor &accessor, storage::Gid gid,
@@ -217,7 +220,8 @@ class DiskStorage final : public Storage {
     void PrefetchEdges(const VertexAccessor &vertex_acc, EdgeDirection edge_direction);
 
     Result<EdgeAccessor> CreateEdge(const VertexAccessor *from, const VertexAccessor *to, EdgeTypeId edge_type,
-                                    storage::Gid gid, std::string_view properties, const std::string &old_disk_key);
+                                    storage::Gid gid, std::string_view properties, const std::string &old_disk_key,
+                                    const std::string &ts);
 
     /// Flushes vertices and edges to the disk with the commit timestamp.
     /// At the time of calling, the commit_timestamp_ must already exist.
diff --git a/src/storage/v2/mvcc.hpp b/src/storage/v2/mvcc.hpp
index d5c50d342..37b01a2d5 100644
--- a/src/storage/v2/mvcc.hpp
+++ b/src/storage/v2/mvcc.hpp
@@ -114,17 +114,22 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
 }
 
 /// TODO: what if in-memory analytical
-inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional<std::string> old_disk_key) {
-  transaction->EnsureCommitTimestampExists();
-  return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), transaction->commit_timestamp.get(),
-                                           old_disk_key);
+inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional<std::string> old_disk_key,
+                                                  const std::string &ts) {
+  // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
+  return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key);
+}
+
+inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list<Delta> &deltas,
+                                                       std::optional<std::string> old_disk_key, const uint64_t ts) {
+  return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, old_disk_key);
 }
 
 /// TODO: what if in-memory analytical
 inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list<Delta> &deltas,
-                                                       std::optional<std::string> old_disk_key) {
-  transaction->EnsureCommitTimestampExists();
-  return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), transaction->commit_timestamp.get(), old_disk_key);
+                                                       std::optional<std::string> old_disk_key, const std::string &ts) {
+  // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
+  return CreateDeleteDeserializedIndexObjectDelta(transaction, deltas, old_disk_key, std::stoull(ts));
 }
 
 /// This function creates a delta in the transaction for the object and links
diff --git a/src/utils/disk_utils.hpp b/src/utils/disk_utils.hpp
index 0372a38bb..06b673419 100644
--- a/src/utils/disk_utils.hpp
+++ b/src/utils/disk_utils.hpp
@@ -25,4 +25,12 @@ inline std::optional<std::string> GetOldDiskKeyOrNull(storage::Delta *head) {
   return std::nullopt;
 }
 
+inline uint64_t GetEarliestTimestamp(storage::Delta *head) {
+  if (head == nullptr) return 0;
+  while (head->next != nullptr) {
+    head = head->next;
+  }
+  return head->timestamp->load(std::memory_order_acquire);
+}
+
 }  // namespace memgraph::utils