Improve disk indices (#1139)
This commit is contained in:
parent
271b1a5ddb
commit
762fe6a65d
@ -124,6 +124,10 @@ bool DiskLabelIndex::ClearDeletedVertex(std::string_view gid, uint64_t transacti
|
|||||||
|
|
||||||
bool DiskLabelIndex::DeleteVerticesWithRemovedIndexingLabel(uint64_t transaction_start_timestamp,
|
bool DiskLabelIndex::DeleteVerticesWithRemovedIndexingLabel(uint64_t transaction_start_timestamp,
|
||||||
uint64_t transaction_commit_timestamp) {
|
uint64_t transaction_commit_timestamp) {
|
||||||
|
if (entries_for_deletion->empty()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
auto disk_transaction = CreateAllReadingRocksDBTransaction();
|
auto disk_transaction = CreateAllReadingRocksDBTransaction();
|
||||||
|
|
||||||
rocksdb::ReadOptions ro;
|
rocksdb::ReadOptions ro;
|
||||||
|
@ -131,6 +131,9 @@ bool DiskLabelPropertyIndex::ClearDeletedVertex(std::string_view gid, uint64_t t
|
|||||||
|
|
||||||
bool DiskLabelPropertyIndex::DeleteVerticesWithRemovedIndexingLabel(uint64_t transaction_start_timestamp,
|
bool DiskLabelPropertyIndex::DeleteVerticesWithRemovedIndexingLabel(uint64_t transaction_start_timestamp,
|
||||||
uint64_t transaction_commit_timestamp) {
|
uint64_t transaction_commit_timestamp) {
|
||||||
|
if (entries_for_deletion->empty()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
auto disk_transaction = CreateAllReadingRocksDBTransaction();
|
auto disk_transaction = CreateAllReadingRocksDBTransaction();
|
||||||
|
|
||||||
rocksdb::ReadOptions ro;
|
rocksdb::ReadOptions ro;
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
#include "utils/disk_utils.hpp"
|
#include "utils/disk_utils.hpp"
|
||||||
#include "utils/exceptions.hpp"
|
#include "utils/exceptions.hpp"
|
||||||
#include "utils/file.hpp"
|
#include "utils/file.hpp"
|
||||||
|
#include "utils/logging.hpp"
|
||||||
#include "utils/memory_tracker.hpp"
|
#include "utils/memory_tracker.hpp"
|
||||||
#include "utils/message.hpp"
|
#include "utils/message.hpp"
|
||||||
#include "utils/on_scope_exit.hpp"
|
#include "utils/on_scope_exit.hpp"
|
||||||
@ -156,7 +157,7 @@ bool HasVertexProperty(const Vertex &vertex, PropertyId property, Transaction *t
|
|||||||
return !GetVertexProperty(vertex, property, transaction, view).IsNull();
|
return !GetVertexProperty(vertex, property, transaction, view).IsNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool HasVertexEqualPropertyValue(const Vertex &vertex, PropertyId property_id, PropertyValue property_value,
|
bool VertexHasEqualPropertyValue(const Vertex &vertex, PropertyId property_id, PropertyValue property_value,
|
||||||
Transaction *transaction, View view) {
|
Transaction *transaction, View view) {
|
||||||
return GetVertexProperty(vertex, property_id, transaction, view) == property_value;
|
return GetVertexProperty(vertex, property_id, transaction, view) == property_value;
|
||||||
}
|
}
|
||||||
@ -305,47 +306,43 @@ DiskStorage::DiskAccessor::~DiskAccessor() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// NOTE: This will create Delta object which will cause deletion of old key entry on the disk
|
/// NOTE: This will create Delta object which will cause deletion of old key entry on the disk
|
||||||
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(
|
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(std::string &&key,
|
||||||
const rocksdb::Slice &key, const rocksdb::Slice &value) {
|
std::string &&value) {
|
||||||
auto main_storage_accessor = vertices_.access();
|
auto main_storage_accessor = vertices_.access();
|
||||||
|
|
||||||
const std::string key_str = key.ToString();
|
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key)));
|
||||||
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key_str)));
|
|
||||||
if (VertexExistsInCache(main_storage_accessor, gid)) {
|
if (VertexExistsInCache(main_storage_accessor, gid)) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
std::vector<LabelId> labels_id = utils::DeserializeLabelsFromMainDiskStorage(key_str);
|
std::vector<LabelId> labels_id = utils::DeserializeLabelsFromMainDiskStorage(key);
|
||||||
return CreateVertex(main_storage_accessor, gid, labels_id,
|
return CreateVertex(main_storage_accessor, gid, labels_id, utils::DeserializePropertiesFromMainDiskStorage(value),
|
||||||
utils::DeserializePropertiesFromMainDiskStorage(value.ToStringView()),
|
CreateDeleteDeserializedObjectDelta(&transaction_, key));
|
||||||
CreateDeleteDeserializedObjectDelta(&transaction_, key.ToString()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelIndexCache(
|
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelIndexCache(
|
||||||
const rocksdb::Slice &key, const rocksdb::Slice &value, Delta *index_delta,
|
std::string &&key, std::string &&value, Delta *index_delta,
|
||||||
utils::SkipList<storage::Vertex>::Accessor index_accessor) {
|
utils::SkipList<storage::Vertex>::Accessor index_accessor) {
|
||||||
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key.ToString())));
|
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key)));
|
||||||
if (VertexExistsInCache(index_accessor, gid)) {
|
if (VertexExistsInCache(index_accessor, gid)) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::string value_str{value.ToString()};
|
std::vector<LabelId> labels_id{utils::DeserializeLabelsFromLabelIndexStorage(value)};
|
||||||
const auto labels{utils::DeserializeLabelsFromLabelIndexStorage(value_str)};
|
PropertyStore properties{utils::DeserializePropertiesFromLabelIndexStorage(value)};
|
||||||
return CreateVertex(index_accessor, gid, labels, utils::DeserializePropertiesFromLabelIndexStorage(value_str),
|
return CreateVertex(index_accessor, gid, labels_id, std::move(properties), index_delta);
|
||||||
index_delta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelPropertyIndexCache(
|
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelPropertyIndexCache(
|
||||||
const rocksdb::Slice &key, const rocksdb::Slice &value, Delta *index_delta,
|
std::string &&key, std::string &&value, Delta *index_delta,
|
||||||
utils::SkipList<storage::Vertex>::Accessor index_accessor) {
|
utils::SkipList<storage::Vertex>::Accessor index_accessor) {
|
||||||
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key.ToString())));
|
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
|
||||||
if (VertexExistsInCache(index_accessor, gid)) {
|
if (VertexExistsInCache(index_accessor, gid)) {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::string value_str{value.ToString()};
|
std::vector<LabelId> labels_id{utils::DeserializeLabelsFromLabelPropertyIndexStorage(value)};
|
||||||
const auto labels{utils::DeserializeLabelsFromLabelPropertyIndexStorage(value_str)};
|
PropertyStore properties{utils::DeserializePropertiesFromLabelPropertyIndexStorage(value)};
|
||||||
return CreateVertex(index_accessor, gid, labels,
|
return CreateVertex(index_accessor, gid, labels_id, std::move(properties), index_delta);
|
||||||
utils::DeserializePropertiesFromLabelPropertyIndexStorage(value.ToString()), index_delta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const rocksdb::Slice &key,
|
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const rocksdb::Slice &key,
|
||||||
@ -390,7 +387,7 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) {
|
|||||||
auto it =
|
auto it =
|
||||||
std::unique_ptr<rocksdb::Iterator>(disk_transaction_->GetIterator(ro, disk_storage->kvstore_->vertex_chandle));
|
std::unique_ptr<rocksdb::Iterator>(disk_transaction_->GetIterator(ro, disk_storage->kvstore_->vertex_chandle));
|
||||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||||
LoadVertexToMainMemoryCache(it->key(), it->value());
|
LoadVertexToMainMemoryCache(it->key().ToString(), it->value().ToString());
|
||||||
}
|
}
|
||||||
return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_,
|
return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_,
|
||||||
&storage_->constraints_, storage_->config_.items));
|
&storage_->constraints_, storage_->config_.items));
|
||||||
@ -399,24 +396,26 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) {
|
|||||||
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, View view) {
|
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, View view) {
|
||||||
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
||||||
auto &indexed_vertices = index_storage_.back();
|
auto &indexed_vertices = index_storage_.back();
|
||||||
index_deltas_storage_.emplace_back(std::list<Delta>());
|
index_deltas_storage_.emplace_back();
|
||||||
auto &index_deltas = index_deltas_storage_.back();
|
auto &index_deltas = index_deltas_storage_.back();
|
||||||
|
|
||||||
auto *disk_label_index = static_cast<DiskLabelIndex *>(storage_->indices_.label_index_.get());
|
auto gids = MergeVerticesFromMainCacheWithLabelIndexCache(label, view, index_deltas, indexed_vertices.get());
|
||||||
auto disk_index_transaction = disk_label_index->CreateRocksDBTransaction();
|
LoadVerticesFromDiskLabelIndex(label, gids, index_deltas, indexed_vertices.get());
|
||||||
disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp);
|
|
||||||
rocksdb::ReadOptions ro;
|
|
||||||
std::string strTs = utils::StringTimestamp(transaction_.start_timestamp);
|
|
||||||
rocksdb::Slice ts(strTs);
|
|
||||||
ro.timestamp = &ts;
|
|
||||||
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
|
||||||
|
|
||||||
|
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
||||||
|
&storage_->constraints_, storage_->config_.items));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelIndexCache(
|
||||||
|
LabelId label, View view, std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices) {
|
||||||
auto main_cache_acc = vertices_.access();
|
auto main_cache_acc = vertices_.access();
|
||||||
std::unordered_set<storage::Gid> gids(main_cache_acc.size());
|
std::unordered_set<Gid> gids;
|
||||||
|
gids.reserve(main_cache_acc.size());
|
||||||
|
|
||||||
for (const auto &vertex : main_cache_acc) {
|
for (const auto &vertex : main_cache_acc) {
|
||||||
gids.insert(vertex.gid);
|
gids.insert(vertex.gid);
|
||||||
if (VertexHasLabel(vertex, label, &transaction_, view)) {
|
if (VertexHasLabel(vertex, label, &transaction_, view)) {
|
||||||
spdlog::debug("Loaded vertex with gid: {} from main index storage to label index",
|
spdlog::trace("Loaded vertex with gid: {} from main index storage to label index",
|
||||||
utils::SerializeIdType(vertex.gid));
|
utils::SerializeIdType(vertex.gid));
|
||||||
LoadVertexToLabelIndexCache(utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
|
LoadVertexToLabelIndexCache(utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
|
||||||
utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
|
utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
|
||||||
@ -424,29 +423,90 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, View view) {
|
|||||||
indexed_vertices->access());
|
indexed_vertices->access());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return gids;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelIndex(LabelId label,
|
||||||
|
const std::unordered_set<storage::Gid> &gids,
|
||||||
|
std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices) {
|
||||||
|
auto *disk_label_index = static_cast<DiskLabelIndex *>(storage_->indices_.label_index_.get());
|
||||||
|
auto disk_index_transaction = disk_label_index->CreateRocksDBTransaction();
|
||||||
|
disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp);
|
||||||
|
|
||||||
|
rocksdb::ReadOptions ro;
|
||||||
|
std::string strTs = utils::StringTimestamp(transaction_.start_timestamp);
|
||||||
|
rocksdb::Slice ts(strTs);
|
||||||
|
ro.timestamp = &ts;
|
||||||
|
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
||||||
|
|
||||||
|
const auto serialized_label = utils::SerializeIdType(label);
|
||||||
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
||||||
std::string key = index_it->key().ToString();
|
std::string key = index_it->key().ToString();
|
||||||
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key)));
|
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key)));
|
||||||
spdlog::debug("Loaded vertex with key: {} from label index storage", key);
|
spdlog::trace("Loaded vertex with key: {} from label index storage", key);
|
||||||
/// TODO: optimize
|
if (key.starts_with(serialized_label) && !utils::Contains(gids, curr_gid)) {
|
||||||
if (key.starts_with(utils::SerializeIdType(label)) && !utils::Contains(gids, curr_gid)) {
|
LoadVertexToLabelIndexCache(index_it->key().ToString(), index_it->value().ToString(),
|
||||||
LoadVertexToLabelIndexCache(index_it->key(), index_it->value(),
|
|
||||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
||||||
indexed_vertices->access());
|
indexed_vertices->access());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
|
||||||
&storage_->constraints_, storage_->config_.items));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, View view) {
|
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, View view) {
|
||||||
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
||||||
auto &indexed_vertices = index_storage_.back();
|
auto &indexed_vertices = index_storage_.back();
|
||||||
index_deltas_storage_.emplace_back(std::list<Delta>());
|
index_deltas_storage_.emplace_back();
|
||||||
auto &index_deltas = index_deltas_storage_.back();
|
auto &index_deltas = index_deltas_storage_.back();
|
||||||
|
|
||||||
|
const auto label_property_filter = [this](const Vertex &vertex, LabelId label, PropertyId property,
|
||||||
|
View view) -> bool {
|
||||||
|
return VertexHasLabel(vertex, label, &transaction_, view) &&
|
||||||
|
HasVertexProperty(vertex, property, &transaction_, view);
|
||||||
|
};
|
||||||
|
|
||||||
|
const auto gids = MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
|
||||||
|
label, property, view, index_deltas, indexed_vertices.get(), label_property_filter);
|
||||||
|
|
||||||
|
const auto disk_label_property_filter = [](const std::string &key, const std::string &label_property_prefix,
|
||||||
|
const std::unordered_set<Gid> &gids, Gid curr_gid) -> bool {
|
||||||
|
return key.starts_with(label_property_prefix) && !utils::Contains(gids, curr_gid);
|
||||||
|
};
|
||||||
|
|
||||||
|
LoadVerticesFromDiskLabelPropertyIndex(label, property, gids, index_deltas, indexed_vertices.get(),
|
||||||
|
disk_label_property_filter);
|
||||||
|
|
||||||
|
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
||||||
|
&storage_->constraints_, storage_->config_.items));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
|
||||||
|
LabelId label, PropertyId property, View view, std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices, const auto &label_property_filter) {
|
||||||
|
auto main_cache_acc = vertices_.access();
|
||||||
|
std::unordered_set<storage::Gid> gids;
|
||||||
|
gids.reserve(main_cache_acc.size());
|
||||||
|
|
||||||
|
for (const auto &vertex : main_cache_acc) {
|
||||||
|
gids.insert(vertex.gid);
|
||||||
|
/// TODO: delta support for clearing old disk keys
|
||||||
|
if (label_property_filter(vertex, label, property, view)) {
|
||||||
|
LoadVertexToLabelPropertyIndexCache(
|
||||||
|
utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
||||||
|
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
||||||
|
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
|
||||||
|
indexed_vertices->access());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return gids;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndex(LabelId label, PropertyId property,
|
||||||
|
const std::unordered_set<storage::Gid> &gids,
|
||||||
|
std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices,
|
||||||
|
const auto &label_property_filter) {
|
||||||
auto *disk_label_property_index =
|
auto *disk_label_property_index =
|
||||||
static_cast<DiskLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
|
static_cast<DiskLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
|
||||||
|
|
||||||
@ -458,47 +518,47 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId p
|
|||||||
ro.timestamp = &ts;
|
ro.timestamp = &ts;
|
||||||
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
||||||
|
|
||||||
auto main_cache_acc = vertices_.access();
|
const auto label_property_prefix = utils::SerializeIdType(label) + "|" + utils::SerializeIdType(property);
|
||||||
std::unordered_set<storage::Gid> gids(main_cache_acc.size());
|
|
||||||
for (const auto &vertex : main_cache_acc) {
|
|
||||||
gids.insert(vertex.gid);
|
|
||||||
/// TODO: delta support for clearing old disk keys
|
|
||||||
if (VertexHasLabel(vertex, label, &transaction_, view) &&
|
|
||||||
HasVertexProperty(vertex, property, &transaction_, view)) {
|
|
||||||
LoadVertexToLabelPropertyIndexCache(
|
|
||||||
utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
|
||||||
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
|
||||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
|
|
||||||
indexed_vertices->access());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
||||||
std::string key = index_it->key().ToString();
|
std::string key = index_it->key().ToString();
|
||||||
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
|
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
|
||||||
/// TODO: optimize
|
/// TODO: optimize
|
||||||
if (key.starts_with(utils::SerializeIdType(label) + "|" + utils::SerializeIdType(property)) &&
|
if (label_property_filter(key, label_property_prefix, gids, curr_gid)) {
|
||||||
!utils::Contains(gids, curr_gid)) {
|
LoadVertexToLabelPropertyIndexCache(index_it->key().ToString(), index_it->value().ToString(),
|
||||||
LoadVertexToLabelPropertyIndexCache(index_it->key(), index_it->value(),
|
|
||||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
||||||
indexed_vertices->access());
|
indexed_vertices->access());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
|
||||||
&storage_->constraints_, storage_->config_.items));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, const PropertyValue &value,
|
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, const PropertyValue &value,
|
||||||
View view) {
|
View view) {
|
||||||
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
||||||
auto &indexed_vertices = index_storage_.back();
|
auto &indexed_vertices = index_storage_.back();
|
||||||
index_deltas_storage_.emplace_back(std::list<Delta>());
|
index_deltas_storage_.emplace_back();
|
||||||
auto &index_deltas = index_deltas_storage_.back();
|
auto &index_deltas = index_deltas_storage_.back();
|
||||||
|
|
||||||
|
auto label_property_filter = [this, &value](const Vertex &vertex, LabelId label, PropertyId property,
|
||||||
|
View view) -> bool {
|
||||||
|
return VertexHasLabel(vertex, label, &transaction_, view) &&
|
||||||
|
VertexHasEqualPropertyValue(vertex, property, value, &transaction_, view);
|
||||||
|
};
|
||||||
|
|
||||||
|
const auto gids = MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
|
||||||
|
label, property, view, index_deltas, indexed_vertices.get(), label_property_filter);
|
||||||
|
|
||||||
|
LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(label, property, gids, value, index_deltas,
|
||||||
|
indexed_vertices.get());
|
||||||
|
|
||||||
|
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
||||||
|
&storage_->constraints_, storage_->config_.items));
|
||||||
|
}
|
||||||
|
|
||||||
|
void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(
|
||||||
|
LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids, const PropertyValue &value,
|
||||||
|
std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices) {
|
||||||
auto *disk_label_property_index =
|
auto *disk_label_property_index =
|
||||||
static_cast<DiskLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
|
static_cast<DiskLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
|
||||||
|
|
||||||
auto disk_index_transaction = disk_label_property_index->CreateRocksDBTransaction();
|
auto disk_index_transaction = disk_label_property_index->CreateRocksDBTransaction();
|
||||||
disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp);
|
disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp);
|
||||||
rocksdb::ReadOptions ro;
|
rocksdb::ReadOptions ro;
|
||||||
@ -507,37 +567,20 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId p
|
|||||||
ro.timestamp = &ts;
|
ro.timestamp = &ts;
|
||||||
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
||||||
|
|
||||||
auto main_cache_acc = vertices_.access();
|
const auto label_property_prefix = utils::SerializeIdType(label) + "|" + utils::SerializeIdType(property);
|
||||||
std::unordered_set<storage::Gid> gids(main_cache_acc.size());
|
|
||||||
for (const auto &vertex : main_cache_acc) {
|
|
||||||
gids.insert(vertex.gid);
|
|
||||||
if (VertexHasLabel(vertex, label, &transaction_, view) &&
|
|
||||||
HasVertexEqualPropertyValue(vertex, property, value, &transaction_, view)) {
|
|
||||||
LoadVertexToLabelPropertyIndexCache(
|
|
||||||
utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
|
||||||
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
|
||||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
|
|
||||||
indexed_vertices->access());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
||||||
std::string key_str = index_it->key().ToString();
|
std::string key = index_it->key().ToString();
|
||||||
std::string it_value_str = index_it->value().ToString();
|
std::string it_value = index_it->value().ToString();
|
||||||
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key_str)));
|
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
|
||||||
/// TODO: optimize
|
/// TODO: optimize
|
||||||
/// TODO: couple this condition
|
PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value);
|
||||||
PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value_str);
|
if (key.starts_with(label_property_prefix) && !utils::Contains(gids, curr_gid) &&
|
||||||
if (key_str.starts_with(utils::SerializeIdType(label) + "|" + utils::SerializeIdType(property)) &&
|
properties.IsPropertyEqual(property, value)) {
|
||||||
!utils::Contains(gids, curr_gid) && properties.IsPropertyEqual(property, value)) {
|
LoadVertexToLabelPropertyIndexCache(index_it->key().ToString(), index_it->value().ToString(),
|
||||||
LoadVertexToLabelPropertyIndexCache(
|
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
|
||||||
index_it->key(), index_it->value(),
|
indexed_vertices->access());
|
||||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str), indexed_vertices->access());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
|
||||||
&storage_->constraints_, storage_->config_.items));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property,
|
VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property,
|
||||||
@ -546,25 +589,30 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId p
|
|||||||
View view) {
|
View view) {
|
||||||
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
index_storage_.emplace_back(std::make_unique<utils::SkipList<storage::Vertex>>());
|
||||||
auto &indexed_vertices = index_storage_.back();
|
auto &indexed_vertices = index_storage_.back();
|
||||||
index_deltas_storage_.emplace_back(std::list<Delta>());
|
index_deltas_storage_.emplace_back();
|
||||||
auto &index_deltas = index_deltas_storage_.back();
|
auto &index_deltas = index_deltas_storage_.back();
|
||||||
|
|
||||||
auto *disk_label_property_index =
|
const auto gids = MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch(
|
||||||
static_cast<DiskLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
|
label, property, view, lower_bound, upper_bound, index_deltas, indexed_vertices.get());
|
||||||
|
|
||||||
auto disk_index_transaction = disk_label_property_index->CreateRocksDBTransaction();
|
LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(label, property, gids, lower_bound, upper_bound, index_deltas,
|
||||||
disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp);
|
indexed_vertices.get());
|
||||||
rocksdb::ReadOptions ro;
|
|
||||||
std::string strTs = utils::StringTimestamp(transaction_.start_timestamp);
|
|
||||||
rocksdb::Slice ts(strTs);
|
|
||||||
ro.timestamp = &ts;
|
|
||||||
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
|
||||||
|
|
||||||
|
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
||||||
|
&storage_->constraints_, storage_->config_.items));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unordered_set<Gid>
|
||||||
|
DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch(
|
||||||
|
LabelId label, PropertyId property, View view, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||||
|
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices) {
|
||||||
auto main_cache_acc = vertices_.access();
|
auto main_cache_acc = vertices_.access();
|
||||||
std::unordered_set<storage::Gid> gids(main_cache_acc.size());
|
std::unordered_set<storage::Gid> gids;
|
||||||
|
gids.reserve(main_cache_acc.size());
|
||||||
|
|
||||||
for (const auto &vertex : main_cache_acc) {
|
for (const auto &vertex : main_cache_acc) {
|
||||||
gids.insert(vertex.gid);
|
gids.insert(vertex.gid);
|
||||||
/// TODO: refactor in one method
|
|
||||||
auto prop_value = GetVertexProperty(vertex, property, &transaction_, view);
|
auto prop_value = GetVertexProperty(vertex, property, &transaction_, view);
|
||||||
if (VertexHasLabel(vertex, label, &transaction_, view) &&
|
if (VertexHasLabel(vertex, label, &transaction_, view) &&
|
||||||
IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
|
IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
|
||||||
@ -575,25 +623,42 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId p
|
|||||||
indexed_vertices->access());
|
indexed_vertices->access());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return gids;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(
|
||||||
|
LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids,
|
||||||
|
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||||
|
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices) {
|
||||||
|
auto *disk_label_property_index =
|
||||||
|
static_cast<DiskLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
|
||||||
|
|
||||||
|
auto disk_index_transaction = disk_label_property_index->CreateRocksDBTransaction();
|
||||||
|
disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp);
|
||||||
|
rocksdb::ReadOptions ro;
|
||||||
|
std::string strTs = utils::StringTimestamp(transaction_.start_timestamp);
|
||||||
|
rocksdb::Slice ts(strTs);
|
||||||
|
ro.timestamp = &ts;
|
||||||
|
auto index_it = std::unique_ptr<rocksdb::Iterator>(disk_index_transaction->GetIterator(ro));
|
||||||
|
|
||||||
|
const std::string label_property_prefix = utils::SerializeIdType(label) + "|" + utils::SerializeIdType(property);
|
||||||
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) {
|
||||||
std::string key_str = index_it->key().ToString();
|
std::string key_str = index_it->key().ToString();
|
||||||
std::string it_value_str = index_it->value().ToString();
|
std::string it_value_str = index_it->value().ToString();
|
||||||
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key_str)));
|
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key_str)));
|
||||||
// TODO: andi this will be optimized bla bla
|
/// TODO: andi this will be optimized
|
||||||
/// TODO: couple this condition
|
/// TODO: couple this condition
|
||||||
PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value_str);
|
PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value_str);
|
||||||
auto prop_value = properties.GetProperty(property);
|
auto prop_value = properties.GetProperty(property);
|
||||||
if (key_str.starts_with(utils::SerializeIdType(label) + "|" + utils::SerializeIdType(property)) &&
|
if (!key_str.starts_with(label_property_prefix) || utils::Contains(gids, curr_gid) ||
|
||||||
!utils::Contains(gids, curr_gid) && IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
|
!IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
|
||||||
LoadVertexToLabelPropertyIndexCache(
|
continue;
|
||||||
index_it->key(), index_it->value(),
|
|
||||||
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str), indexed_vertices->access());
|
|
||||||
}
|
}
|
||||||
|
LoadVertexToLabelPropertyIndexCache(index_it->key().ToString(), index_it->value().ToString(),
|
||||||
|
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str),
|
||||||
|
indexed_vertices->access());
|
||||||
}
|
}
|
||||||
|
|
||||||
return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_,
|
|
||||||
&storage_->constraints_, storage_->config_.items));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t DiskStorage::DiskAccessor::ApproximateVertexCount() const {
|
uint64_t DiskStorage::DiskAccessor::ApproximateVertexCount() const {
|
||||||
@ -765,9 +830,9 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
|
|||||||
auto it = std::unique_ptr<rocksdb::Iterator>(
|
auto it = std::unique_ptr<rocksdb::Iterator>(
|
||||||
disk_transaction_->GetIterator(read_opts, disk_storage->kvstore_->vertex_chandle));
|
disk_transaction_->GetIterator(read_opts, disk_storage->kvstore_->vertex_chandle));
|
||||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||||
const auto &key = it->key();
|
std::string key = it->key().ToString();
|
||||||
if (Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key.ToString()))) == gid) {
|
if (Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key))) == gid) {
|
||||||
return LoadVertexToMainMemoryCache(key, it->value());
|
return LoadVertexToMainMemoryCache(std::move(key), it->value().ToString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
@ -1134,11 +1199,12 @@ Result<std::optional<EdgeAccessor>> DiskStorage::DiskAccessor::DeleteEdge(EdgeAc
|
|||||||
/// TODO: at which storage naming
|
/// TODO: at which storage naming
|
||||||
/// TODO: this method should also delete the old key
|
/// TODO: this method should also delete the old key
|
||||||
bool DiskStorage::DiskAccessor::WriteVertexToDisk(const Vertex &vertex) {
|
bool DiskStorage::DiskAccessor::WriteVertexToDisk(const Vertex &vertex) {
|
||||||
|
MG_ASSERT(commit_timestamp_.has_value(), "Writing vertex to disk but commit timestamp not set.");
|
||||||
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
||||||
auto status = disk_transaction_->Put(disk_storage->kvstore_->vertex_chandle, utils::SerializeVertex(vertex),
|
auto status = disk_transaction_->Put(disk_storage->kvstore_->vertex_chandle, utils::SerializeVertex(vertex),
|
||||||
utils::SerializeProperties(vertex.properties));
|
utils::SerializeProperties(vertex.properties));
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
spdlog::debug("rocksdb: Saved vertex with key {} and ts {}", utils::SerializeVertex(vertex), *commit_timestamp_);
|
spdlog::trace("rocksdb: Saved vertex with key {} and ts {}", utils::SerializeVertex(vertex), *commit_timestamp_);
|
||||||
} else if (status.IsBusy()) {
|
} else if (status.IsBusy()) {
|
||||||
spdlog::error("rocksdb: Vertex with key {} and ts {} was changed and committed in another transaction",
|
spdlog::error("rocksdb: Vertex with key {} and ts {} was changed and committed in another transaction",
|
||||||
utils::SerializeVertex(vertex), *commit_timestamp_);
|
utils::SerializeVertex(vertex), *commit_timestamp_);
|
||||||
@ -1153,6 +1219,7 @@ bool DiskStorage::DiskAccessor::WriteVertexToDisk(const Vertex &vertex) {
|
|||||||
|
|
||||||
/// TODO: at which storage naming
|
/// TODO: at which storage naming
|
||||||
bool DiskStorage::DiskAccessor::WriteEdgeToDisk(const EdgeRef edge, const std::string &serializedEdgeKey) {
|
bool DiskStorage::DiskAccessor::WriteEdgeToDisk(const EdgeRef edge, const std::string &serializedEdgeKey) {
|
||||||
|
MG_ASSERT(commit_timestamp_.has_value(), "Writing vertex to disk but commit timestamp not set.");
|
||||||
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
||||||
rocksdb::Status status;
|
rocksdb::Status status;
|
||||||
if (config_.properties_on_edges) {
|
if (config_.properties_on_edges) {
|
||||||
@ -1162,7 +1229,7 @@ bool DiskStorage::DiskAccessor::WriteEdgeToDisk(const EdgeRef edge, const std::s
|
|||||||
status = disk_transaction_->Put(disk_storage->kvstore_->edge_chandle, serializedEdgeKey, "");
|
status = disk_transaction_->Put(disk_storage->kvstore_->edge_chandle, serializedEdgeKey, "");
|
||||||
}
|
}
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
spdlog::debug("rocksdb: Saved edge with key {} and ts {}", serializedEdgeKey, *commit_timestamp_);
|
spdlog::trace("rocksdb: Saved edge with key {} and ts {}", serializedEdgeKey, *commit_timestamp_);
|
||||||
} else if (status.IsBusy()) {
|
} else if (status.IsBusy()) {
|
||||||
spdlog::error("rocksdb: Edge with key {} and ts {} was changed and committed in another transaction",
|
spdlog::error("rocksdb: Edge with key {} and ts {} was changed and committed in another transaction",
|
||||||
serializedEdgeKey, *commit_timestamp_);
|
serializedEdgeKey, *commit_timestamp_);
|
||||||
@ -1178,7 +1245,7 @@ bool DiskStorage::DiskAccessor::DeleteVertexFromDisk(const std::string &vertex)
|
|||||||
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
||||||
auto status = disk_transaction_->Delete(disk_storage->kvstore_->vertex_chandle, vertex);
|
auto status = disk_transaction_->Delete(disk_storage->kvstore_->vertex_chandle, vertex);
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
spdlog::debug("rocksdb: Deleted vertex with key {}", vertex);
|
spdlog::trace("rocksdb: Deleted vertex with key {}", vertex);
|
||||||
} else if (status.IsBusy()) {
|
} else if (status.IsBusy()) {
|
||||||
spdlog::error("rocksdb: Vertex with key {} was changed and committed in another transaction", vertex);
|
spdlog::error("rocksdb: Vertex with key {} was changed and committed in another transaction", vertex);
|
||||||
return false;
|
return false;
|
||||||
@ -1193,7 +1260,7 @@ bool DiskStorage::DiskAccessor::DeleteEdgeFromDisk(const std::string &edge) {
|
|||||||
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
||||||
auto status = disk_transaction_->Delete(disk_storage->kvstore_->edge_chandle, edge);
|
auto status = disk_transaction_->Delete(disk_storage->kvstore_->edge_chandle, edge);
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
spdlog::debug("rocksdb: Deleted edge with key {}", edge);
|
spdlog::trace("rocksdb: Deleted edge with key {}", edge);
|
||||||
} else if (status.IsBusy()) {
|
} else if (status.IsBusy()) {
|
||||||
spdlog::error("rocksdb: Edge with key {} was changed and committed in another transaction", edge);
|
spdlog::error("rocksdb: Edge with key {} was changed and committed in another transaction", edge);
|
||||||
return false;
|
return false;
|
||||||
@ -1444,7 +1511,7 @@ utils::BasicResult<StorageDataManipulationError, void> DiskStorage::DiskAccessor
|
|||||||
spdlog::error("rocksdb: Commit failed with status {}", commitStatus.ToString());
|
spdlog::error("rocksdb: Commit failed with status {}", commitStatus.ToString());
|
||||||
return StorageDataManipulationError{SerializationError{}};
|
return StorageDataManipulationError{SerializationError{}};
|
||||||
}
|
}
|
||||||
spdlog::debug("rocksdb: Commit successful");
|
spdlog::trace("rocksdb: Commit successful");
|
||||||
|
|
||||||
is_transaction_active_ = false;
|
is_transaction_active_ = false;
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
#include "storage/v2/id_types.hpp"
|
#include "storage/v2/id_types.hpp"
|
||||||
#include "storage/v2/isolation_level.hpp"
|
#include "storage/v2/isolation_level.hpp"
|
||||||
#include "storage/v2/property_store.hpp"
|
#include "storage/v2/property_store.hpp"
|
||||||
|
#include "storage/v2/property_value.hpp"
|
||||||
#include "storage/v2/storage.hpp"
|
#include "storage/v2/storage.hpp"
|
||||||
#include "utils/rw_lock.hpp"
|
#include "utils/rw_lock.hpp"
|
||||||
|
|
||||||
@ -58,14 +59,48 @@ class DiskStorage final : public Storage {
|
|||||||
|
|
||||||
VerticesIterable Vertices(LabelId label, View view) override;
|
VerticesIterable Vertices(LabelId label, View view) override;
|
||||||
|
|
||||||
|
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelIndexCache(LabelId label, View view,
|
||||||
|
std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices);
|
||||||
|
|
||||||
|
void LoadVerticesFromDiskLabelIndex(LabelId label, const std::unordered_set<storage::Gid> &gids,
|
||||||
|
std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices);
|
||||||
|
|
||||||
VerticesIterable Vertices(LabelId label, PropertyId property, View view) override;
|
VerticesIterable Vertices(LabelId label, PropertyId property, View view) override;
|
||||||
|
|
||||||
|
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
|
||||||
|
LabelId label, PropertyId property, View view, std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices, const auto &label_property_filter);
|
||||||
|
|
||||||
|
void LoadVerticesFromDiskLabelPropertyIndex(LabelId label, PropertyId property,
|
||||||
|
const std::unordered_set<storage::Gid> &gids,
|
||||||
|
std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices,
|
||||||
|
const auto &label_property_filter);
|
||||||
|
|
||||||
VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view) override;
|
VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view) override;
|
||||||
|
|
||||||
|
void LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(LabelId label, PropertyId property,
|
||||||
|
const std::unordered_set<storage::Gid> &gids,
|
||||||
|
const PropertyValue &value,
|
||||||
|
std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices);
|
||||||
|
|
||||||
VerticesIterable Vertices(LabelId label, PropertyId property,
|
VerticesIterable Vertices(LabelId label, PropertyId property,
|
||||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
|
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
|
||||||
|
|
||||||
|
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch(
|
||||||
|
LabelId label, PropertyId property, View view, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||||
|
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices);
|
||||||
|
|
||||||
|
void LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(
|
||||||
|
LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids,
|
||||||
|
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||||
|
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||||
|
utils::SkipList<Vertex> *indexed_vertices);
|
||||||
|
|
||||||
uint64_t ApproximateVertexCount() const override;
|
uint64_t ApproximateVertexCount() const override;
|
||||||
|
|
||||||
uint64_t ApproximateVertexCount(LabelId /*label*/) const override { return 10; }
|
uint64_t ApproximateVertexCount(LabelId /*label*/) const override { return 10; }
|
||||||
@ -161,14 +196,13 @@ class DiskStorage final : public Storage {
|
|||||||
void FinalizeTransaction() override;
|
void FinalizeTransaction() override;
|
||||||
|
|
||||||
std::optional<storage::VertexAccessor> LoadVertexToLabelIndexCache(
|
std::optional<storage::VertexAccessor> LoadVertexToLabelIndexCache(
|
||||||
const rocksdb::Slice &key, const rocksdb::Slice &value, Delta *index_delta,
|
std::string &&key, std::string &&value, Delta *index_delta,
|
||||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||||
|
|
||||||
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(const rocksdb::Slice &key,
|
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(std::string &&key, std::string &&value);
|
||||||
const rocksdb::Slice &value);
|
|
||||||
|
|
||||||
std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
|
std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
|
||||||
const rocksdb::Slice &key, const rocksdb::Slice &value, Delta *index_delta,
|
std::string &&key, std::string &&value, Delta *index_delta,
|
||||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||||
|
|
||||||
std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value);
|
std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value);
|
||||||
|
@ -175,6 +175,10 @@ bool DiskUniqueConstraints::ClearDeletedVertex(const std::string_view gid,
|
|||||||
|
|
||||||
bool DiskUniqueConstraints::DeleteVerticesWithRemovedConstraintLabel(uint64_t transaction_start_timestamp,
|
bool DiskUniqueConstraints::DeleteVerticesWithRemovedConstraintLabel(uint64_t transaction_start_timestamp,
|
||||||
uint64_t transaction_commit_timestamp) {
|
uint64_t transaction_commit_timestamp) {
|
||||||
|
if (entries_for_deletion->empty()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
auto disk_transaction = std::unique_ptr<rocksdb::Transaction>(
|
auto disk_transaction = std::unique_ptr<rocksdb::Transaction>(
|
||||||
kvstore_->db_->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions()));
|
kvstore_->db_->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions()));
|
||||||
disk_transaction->SetReadTimestampForValidation(std::numeric_limits<uint64_t>::max());
|
disk_transaction->SetReadTimestampForValidation(std::numeric_limits<uint64_t>::max());
|
||||||
@ -198,12 +202,11 @@ bool DiskUniqueConstraints::DeleteVerticesWithRemovedConstraintLabel(uint64_t tr
|
|||||||
disk_transaction->SetCommitTimestamp(transaction_commit_timestamp);
|
disk_transaction->SetCommitTimestamp(transaction_commit_timestamp);
|
||||||
auto status = disk_transaction->Commit();
|
auto status = disk_transaction->Commit();
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
/// TODO: better naming
|
|
||||||
spdlog::error("rocksdb: {}", status.getState());
|
spdlog::error("rocksdb: {}", status.getState());
|
||||||
}
|
}
|
||||||
return status.ok();
|
return status.ok();
|
||||||
}
|
}
|
||||||
spdlog::error("Deletetion of vertices with removed constraint label failed.");
|
spdlog::error("Deletion of vertices with removed constraint label failed.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +217,7 @@ bool DiskUniqueConstraints::SyncVertexToUniqueConstraintsStorage(const Vertex &v
|
|||||||
kvstore_->db_->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions()));
|
kvstore_->db_->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions()));
|
||||||
|
|
||||||
if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(vertex.delta); maybe_old_disk_key.has_value()) {
|
if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(vertex.delta); maybe_old_disk_key.has_value()) {
|
||||||
spdlog::debug("Found old disk key {} for vertex {}", maybe_old_disk_key.value(),
|
spdlog::trace("Found old disk key {} for vertex {}", maybe_old_disk_key.value(),
|
||||||
utils::SerializeIdType(vertex.gid));
|
utils::SerializeIdType(vertex.gid));
|
||||||
if (auto status = disk_transaction->Delete(maybe_old_disk_key.value()); !status.ok()) {
|
if (auto status = disk_transaction->Delete(maybe_old_disk_key.value()); !status.ok()) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -263,7 +263,6 @@ TYPED_TEST(IndexTest, LabelIndexBasic) {
|
|||||||
|
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
auto vertex = this->CreateVertex(acc.get());
|
auto vertex = this->CreateVertex(acc.get());
|
||||||
spdlog::debug("Created vertex with gid: {}", memgraph::utils::SerializeIdType(vertex.Gid()));
|
|
||||||
ASSERT_NO_ERROR(vertex.AddLabel(i % 2 ? this->label1 : this->label2));
|
ASSERT_NO_ERROR(vertex.AddLabel(i % 2 ? this->label1 : this->label2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user