Add timestamp to DELETE_DESERIALIZED_OBJECT delta at which this object was created. (#1179)
Add timestamp to DELETE_DESERIALIZED_OBJECT delta at which this object was created. RocksDB currently doesn't provide timestamp() functionality in iterators of TransationDB. Because of that we are using constant "0" timestamp for DELETE_DESERIALIZED_OBJECT.
This commit is contained in:
parent
4b3ba908c7
commit
5f509532f2
@ -154,9 +154,14 @@ struct Delta {
|
||||
struct RemoveInEdgeTag {};
|
||||
struct RemoveOutEdgeTag {};
|
||||
|
||||
Delta(DeleteDeserializedObjectTag /*tag*/, std::atomic<uint64_t> *timestamp,
|
||||
const std::optional<std::string> &old_disk_key)
|
||||
: action(Action::DELETE_DESERIALIZED_OBJECT), timestamp(timestamp), command_id(0), old_disk_key(old_disk_key) {}
|
||||
// DELETE_DESERIALIZED_OBJECT is used to load data from disk committed by past txs.
|
||||
// Because of this object was created in past txs, we create timestamp by ourselves inside instead of having it from
|
||||
// current tx. This timestamp we got from RocksDB timestamp stored in key.
|
||||
Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, const std::optional<std::string> &old_disk_key)
|
||||
: action(Action::DELETE_DESERIALIZED_OBJECT),
|
||||
timestamp(new std::atomic<uint64_t>(ts)),
|
||||
command_id(0),
|
||||
old_disk_key(old_disk_key) {}
|
||||
|
||||
Delta(DeleteObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::DELETE_OBJECT), timestamp(timestamp), command_id(command_id) {}
|
||||
@ -224,6 +229,8 @@ struct Delta {
|
||||
break;
|
||||
case Action::DELETE_DESERIALIZED_OBJECT:
|
||||
old_disk_key.reset();
|
||||
delete timestamp;
|
||||
timestamp = nullptr;
|
||||
break;
|
||||
case Action::SET_PROPERTY:
|
||||
property.value.~PropertyValue();
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <limits>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
|
||||
@ -59,6 +60,7 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr const char *deserializeTimestamp = "0";
|
||||
constexpr const char *vertexHandle = "vertex";
|
||||
constexpr const char *edgeHandle = "edge";
|
||||
constexpr const char *defaultHandle = "default";
|
||||
@ -311,8 +313,9 @@ DiskStorage::DiskAccessor::~DiskAccessor() {
|
||||
}
|
||||
|
||||
/// NOTE: This will create Delta object which will cause deletion of old key entry on the disk
|
||||
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(std::string &&key,
|
||||
std::string &&value) {
|
||||
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(const std::string &key,
|
||||
const std::string &value,
|
||||
const std::string &ts) {
|
||||
auto main_storage_accessor = vertices_.access();
|
||||
|
||||
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key)));
|
||||
@ -322,7 +325,7 @@ std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMa
|
||||
std::vector<LabelId> labels_id{utils::DeserializeLabelsFromMainDiskStorage(key)};
|
||||
PropertyStore properties{utils::DeserializePropertiesFromMainDiskStorage(value)};
|
||||
return CreateVertex(main_storage_accessor, gid, std::move(labels_id), std::move(properties),
|
||||
CreateDeleteDeserializedObjectDelta(&transaction_, key));
|
||||
CreateDeleteDeserializedObjectDelta(&transaction_, key, ts));
|
||||
}
|
||||
|
||||
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelIndexCache(
|
||||
@ -354,7 +357,8 @@ std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLa
|
||||
}
|
||||
|
||||
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const rocksdb::Slice &key,
|
||||
const rocksdb::Slice &value) {
|
||||
const rocksdb::Slice &value,
|
||||
const rocksdb::Slice &ts) {
|
||||
const auto edge_parts = utils::Split(key.ToStringView(), "|");
|
||||
const Gid edge_gid = Gid::FromUint(std::stoull(edge_parts[4]));
|
||||
|
||||
@ -380,7 +384,8 @@ std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const roc
|
||||
throw utils::BasicException("Non-existing vertices found during edge deserialization");
|
||||
}
|
||||
const auto edge_type_id = storage::EdgeTypeId::FromUint(std::stoull(edge_parts[3]));
|
||||
auto maybe_edge = CreateEdge(&*from_acc, &*to_acc, edge_type_id, edge_gid, value.ToStringView(), key.ToString());
|
||||
auto maybe_edge =
|
||||
CreateEdge(&*from_acc, &*to_acc, edge_type_id, edge_gid, value.ToStringView(), key.ToString(), ts.ToString());
|
||||
MG_ASSERT(maybe_edge.HasValue());
|
||||
|
||||
return *maybe_edge;
|
||||
@ -399,7 +404,9 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) {
|
||||
auto it =
|
||||
std::unique_ptr<rocksdb::Iterator>(disk_transaction_->GetIterator(ro, disk_storage->kvstore_->vertex_chandle));
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
LoadVertexToMainMemoryCache(it->key().ToString(), it->value().ToString());
|
||||
// We should pass it->timestamp().ToString() instead of deserializeTimestamp
|
||||
// This is hack until RocksDB will support timestamp() in WBWI iterator
|
||||
LoadVertexToMainMemoryCache(it->key().ToString(), it->value().ToString(), deserializeTimestamp);
|
||||
}
|
||||
scanned_all_vertices_ = true;
|
||||
return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_,
|
||||
@ -430,11 +437,13 @@ std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWit
|
||||
if (VertexHasLabel(vertex, label, &transaction_, view)) {
|
||||
spdlog::trace("Loaded vertex with gid: {} from main index storage to label index",
|
||||
utils::SerializeIdType(vertex.gid));
|
||||
uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
|
||||
/// TODO: here are doing serialization and then later deserialization again -> expensive
|
||||
LoadVertexToLabelIndexCache(label, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
|
||||
indexed_vertices->access());
|
||||
LoadVertexToLabelIndexCache(
|
||||
label, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts),
|
||||
indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
return gids;
|
||||
@ -460,9 +469,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelIndex(LabelId label,
|
||||
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key)));
|
||||
spdlog::trace("Loaded vertex with key: {} from label index storage", key);
|
||||
if (key.starts_with(serialized_label) && !utils::Contains(gids, curr_gid)) {
|
||||
LoadVertexToLabelIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
||||
indexed_vertices->access());
|
||||
// We should pass it->timestamp().ToString() instead of deserializeTimestamp
|
||||
// This is hack until RocksDB will support timestamp() in WBWI iterator
|
||||
LoadVertexToLabelIndexCache(
|
||||
label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp),
|
||||
indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -505,10 +517,11 @@ std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWit
|
||||
gids.insert(vertex.gid);
|
||||
/// TODO: delta support for clearing old disk keys
|
||||
if (label_property_filter(vertex, label, property, view)) {
|
||||
uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
|
||||
LoadVertexToLabelPropertyIndexCache(
|
||||
label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts),
|
||||
indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
@ -538,9 +551,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndex(LabelId l
|
||||
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
|
||||
/// TODO: optimize
|
||||
if (label_property_filter(key, label_property_prefix, gids, curr_gid)) {
|
||||
LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
||||
indexed_vertices->access());
|
||||
// We should pass it->timestamp().ToString() instead of deserializeTimestamp
|
||||
// This is hack until RocksDB will support timestamp() in WBWI iterator
|
||||
LoadVertexToLabelPropertyIndexCache(
|
||||
label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp),
|
||||
indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -590,9 +606,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexWithPointV
|
||||
PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value);
|
||||
if (key.starts_with(label_property_prefix) && !utils::Contains(gids, curr_gid) &&
|
||||
properties.IsPropertyEqual(property, value)) {
|
||||
LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
||||
indexed_vertices->access());
|
||||
// We should pass it->timestamp().ToString() instead of deserializeTimestamp
|
||||
// This is hack until RocksDB will support timestamp() in WBWI iterator
|
||||
LoadVertexToLabelPropertyIndexCache(
|
||||
label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp),
|
||||
indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -630,10 +649,11 @@ DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCache
|
||||
auto prop_value = GetVertexProperty(vertex, property, &transaction_, view);
|
||||
if (VertexHasLabel(vertex, label, &transaction_, view) &&
|
||||
IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
|
||||
uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
|
||||
LoadVertexToLabelPropertyIndexCache(
|
||||
label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts),
|
||||
indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
@ -669,9 +689,12 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexForInterva
|
||||
!IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
|
||||
continue;
|
||||
}
|
||||
LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str),
|
||||
indexed_vertices->access());
|
||||
// We should pass it->timestamp().ToString() instead of deserializeTimestamp
|
||||
// This is hack until RocksDB will support timestamp() in WBWI iterator
|
||||
LoadVertexToLabelPropertyIndexCache(
|
||||
label, index_it->key().ToString(), index_it->value().ToString(),
|
||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str, deserializeTimestamp),
|
||||
indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
|
||||
@ -841,7 +864,9 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
std::string key = it->key().ToString();
|
||||
if (Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key))) == gid) {
|
||||
return LoadVertexToMainMemoryCache(std::move(key), it->value().ToString());
|
||||
// We should pass it->timestamp().ToString() instead of deserializeTimestamp
|
||||
// This is hack until RocksDB will support timestamp() in WBWI iterator
|
||||
return LoadVertexToMainMemoryCache(key, it->value().ToString(), deserializeTimestamp);
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
@ -955,7 +980,9 @@ void DiskStorage::DiskAccessor::PrefetchEdges(const VertexAccessor &vertex_acc,
|
||||
const rocksdb::Slice &key = it->key();
|
||||
auto keyStr = key.ToStringView();
|
||||
if (PrefetchEdgeFilter(keyStr, vertex_acc, edge_direction)) {
|
||||
DeserializeEdge(key, it->value());
|
||||
// We should pass it->timestamp().ToString() instead of deserializeTimestamp
|
||||
// This is hack until RocksDB will support timestamp() in WBWI iterator
|
||||
DeserializeEdge(key, it->value(), deserializeTimestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -971,7 +998,8 @@ void DiskStorage::DiskAccessor::PrefetchOutEdges(const VertexAccessor &vertex_ac
|
||||
Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(const VertexAccessor *from, const VertexAccessor *to,
|
||||
EdgeTypeId edge_type, storage::Gid gid,
|
||||
const std::string_view properties,
|
||||
const std::string &old_disk_key) {
|
||||
const std::string &old_disk_key,
|
||||
const std::string &read_ts) {
|
||||
OOMExceptionEnabler oom_exception;
|
||||
auto *from_vertex = from->vertex_;
|
||||
auto *to_vertex = to->vertex_;
|
||||
@ -985,7 +1013,7 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(const VertexAccessor
|
||||
EdgeRef edge(gid);
|
||||
if (config_.properties_on_edges) {
|
||||
auto acc = edges_.access();
|
||||
auto *delta = CreateDeleteDeserializedObjectDelta(&transaction_, old_disk_key);
|
||||
auto *delta = CreateDeleteDeserializedObjectDelta(&transaction_, old_disk_key, read_ts);
|
||||
auto [it, inserted] = acc.insert(Edge(gid, delta));
|
||||
MG_ASSERT(inserted, "The edge must be inserted here!");
|
||||
MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "utils/rw_lock.hpp"
|
||||
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/slice.h>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace memgraph::storage {
|
||||
@ -200,13 +201,15 @@ class DiskStorage final : public Storage {
|
||||
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
|
||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||
|
||||
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(std::string &&key, std::string &&value);
|
||||
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(const std::string &key, const std::string &value,
|
||||
const std::string &ts);
|
||||
|
||||
std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
|
||||
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
|
||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||
|
||||
std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value);
|
||||
std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value,
|
||||
const rocksdb::Slice &ts);
|
||||
|
||||
private:
|
||||
VertexAccessor CreateVertex(utils::SkipList<Vertex>::Accessor &accessor, storage::Gid gid,
|
||||
@ -217,7 +220,8 @@ class DiskStorage final : public Storage {
|
||||
void PrefetchEdges(const VertexAccessor &vertex_acc, EdgeDirection edge_direction);
|
||||
|
||||
Result<EdgeAccessor> CreateEdge(const VertexAccessor *from, const VertexAccessor *to, EdgeTypeId edge_type,
|
||||
storage::Gid gid, std::string_view properties, const std::string &old_disk_key);
|
||||
storage::Gid gid, std::string_view properties, const std::string &old_disk_key,
|
||||
const std::string &ts);
|
||||
|
||||
/// Flushes vertices and edges to the disk with the commit timestamp.
|
||||
/// At the time of calling, the commit_timestamp_ must already exist.
|
||||
|
@ -114,17 +114,22 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
||||
}
|
||||
|
||||
/// TODO: what if in-memory analytical
|
||||
inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional<std::string> old_disk_key) {
|
||||
transaction->EnsureCommitTimestampExists();
|
||||
return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), transaction->commit_timestamp.get(),
|
||||
old_disk_key);
|
||||
inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional<std::string> old_disk_key,
|
||||
const std::string &ts) {
|
||||
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
|
||||
return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key);
|
||||
}
|
||||
|
||||
inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list<Delta> &deltas,
|
||||
std::optional<std::string> old_disk_key, const uint64_t ts) {
|
||||
return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, old_disk_key);
|
||||
}
|
||||
|
||||
/// TODO: what if in-memory analytical
|
||||
inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list<Delta> &deltas,
|
||||
std::optional<std::string> old_disk_key) {
|
||||
transaction->EnsureCommitTimestampExists();
|
||||
return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), transaction->commit_timestamp.get(), old_disk_key);
|
||||
std::optional<std::string> old_disk_key, const std::string &ts) {
|
||||
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
|
||||
return CreateDeleteDeserializedIndexObjectDelta(transaction, deltas, old_disk_key, std::stoull(ts));
|
||||
}
|
||||
|
||||
/// This function creates a delta in the transaction for the object and links
|
||||
|
@ -25,4 +25,12 @@ inline std::optional<std::string> GetOldDiskKeyOrNull(storage::Delta *head) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
inline uint64_t GetEarliestTimestamp(storage::Delta *head) {
|
||||
if (head == nullptr) return 0;
|
||||
while (head->next != nullptr) {
|
||||
head = head->next;
|
||||
}
|
||||
return head->timestamp->load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
} // namespace memgraph::utils
|
||||
|
Loading…
Reference in New Issue
Block a user