Optimize import of edges on disk (#1132)

This commit is contained in:
Andi 2023-08-10 11:53:07 +02:00 committed by GitHub
parent 509183e985
commit adf7533751
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 66 deletions

View File

@ -68,6 +68,19 @@ constexpr const char *label_property_index_str = "label_property_index";
constexpr const char *existence_constraints_str = "existence_constraints";
constexpr const char *unique_constraints_str = "unique_constraints";
bool VertexNeedsToBeSerialized(const Vertex &vertex) {
Delta *head = vertex.delta;
while (head != nullptr) {
if (head->action == Delta::Action::ADD_LABEL || head->action == Delta::Action::REMOVE_LABEL ||
head->action == Delta::Action::DELETE_OBJECT || head->action == Delta::Action::RECREATE_OBJECT ||
head->action == Delta::Action::SET_PROPERTY) {
return true;
}
head = head->next;
}
return false;
}
bool VertexExistsInCache(const utils::SkipList<Vertex>::Accessor &accessor, Gid gid) {
return accessor.find(gid) != accessor.end();
}
@ -315,13 +328,14 @@ std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToMa
if (VertexExistsInCache(main_storage_accessor, gid)) {
return std::nullopt;
}
std::vector<LabelId> labels_id = utils::DeserializeLabelsFromMainDiskStorage(key);
return CreateVertex(main_storage_accessor, gid, labels_id, utils::DeserializePropertiesFromMainDiskStorage(value),
std::vector<LabelId> labels_id{utils::DeserializeLabelsFromMainDiskStorage(key)};
PropertyStore properties{utils::DeserializePropertiesFromMainDiskStorage(value)};
return CreateVertex(main_storage_accessor, gid, std::move(labels_id), std::move(properties),
CreateDeleteDeserializedObjectDelta(&transaction_, key));
}
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelIndexCache(
std::string &&key, std::string &&value, Delta *index_delta,
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
utils::SkipList<storage::Vertex>::Accessor index_accessor) {
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key)));
if (VertexExistsInCache(index_accessor, gid)) {
@ -329,12 +343,13 @@ std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLa
}
std::vector<LabelId> labels_id{utils::DeserializeLabelsFromLabelIndexStorage(value)};
labels_id.push_back(indexing_label);
PropertyStore properties{utils::DeserializePropertiesFromLabelIndexStorage(value)};
return CreateVertex(index_accessor, gid, labels_id, std::move(properties), index_delta);
return CreateVertex(index_accessor, gid, std::move(labels_id), std::move(properties), index_delta);
}
std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLabelPropertyIndexCache(
std::string &&key, std::string &&value, Delta *index_delta,
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
utils::SkipList<storage::Vertex>::Accessor index_accessor) {
storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
if (VertexExistsInCache(index_accessor, gid)) {
@ -342,8 +357,9 @@ std::optional<storage::VertexAccessor> DiskStorage::DiskAccessor::LoadVertexToLa
}
std::vector<LabelId> labels_id{utils::DeserializeLabelsFromLabelPropertyIndexStorage(value)};
labels_id.push_back(indexing_label);
PropertyStore properties{utils::DeserializePropertiesFromLabelPropertyIndexStorage(value)};
return CreateVertex(index_accessor, gid, labels_id, std::move(properties), index_delta);
return CreateVertex(index_accessor, gid, std::move(labels_id), std::move(properties), index_delta);
}
std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const rocksdb::Slice &key,
@ -380,6 +396,10 @@ std::optional<EdgeAccessor> DiskStorage::DiskAccessor::DeserializeEdge(const roc
}
VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) {
if (scanned_all_vertices_) {
return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_,
&storage_->constraints_, storage_->config_.items));
}
auto *disk_storage = static_cast<DiskStorage *>(storage_);
rocksdb::ReadOptions ro;
std::string strTs = utils::StringTimestamp(transaction_.start_timestamp);
@ -390,6 +410,7 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {
LoadVertexToMainMemoryCache(it->key().ToString(), it->value().ToString());
}
scanned_all_vertices_ = true;
return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_,
&storage_->constraints_, storage_->config_.items));
}
@ -418,7 +439,8 @@ std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWit
if (VertexHasLabel(vertex, label, &transaction_, view)) {
spdlog::trace("Loaded vertex with gid: {} from main index storage to label index",
utils::SerializeIdType(vertex.gid));
LoadVertexToLabelIndexCache(utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
/// TODO: here are doing serialization and then later deserialization again -> expensive
LoadVertexToLabelIndexCache(label, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
indexed_vertices->access());
@ -447,7 +469,7 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelIndex(LabelId label,
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key)));
spdlog::trace("Loaded vertex with key: {} from label index storage", key);
if (key.starts_with(serialized_label) && !utils::Contains(gids, curr_gid)) {
LoadVertexToLabelIndexCache(index_it->key().ToString(), index_it->value().ToString(),
LoadVertexToLabelIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
indexed_vertices->access());
}
@ -493,7 +515,7 @@ std::unordered_set<Gid> DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWit
/// TODO: delta support for clearing old disk keys
if (label_property_filter(vertex, label, property, view)) {
LoadVertexToLabelPropertyIndexCache(
utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
indexed_vertices->access());
@ -525,7 +547,7 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndex(LabelId l
Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key)));
/// TODO: optimize
if (label_property_filter(key, label_property_prefix, gids, curr_gid)) {
LoadVertexToLabelPropertyIndexCache(index_it->key().ToString(), index_it->value().ToString(),
LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
indexed_vertices->access());
}
@ -577,7 +599,7 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexWithPointV
PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value);
if (key.starts_with(label_property_prefix) && !utils::Contains(gids, curr_gid) &&
properties.IsPropertyEqual(property, value)) {
LoadVertexToLabelPropertyIndexCache(index_it->key().ToString(), index_it->value().ToString(),
LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key),
indexed_vertices->access());
}
@ -618,7 +640,7 @@ DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCache
if (VertexHasLabel(vertex, label, &transaction_, view) &&
IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
LoadVertexToLabelPropertyIndexCache(
utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt),
indexed_vertices->access());
@ -656,7 +678,7 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexForInterva
!IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
continue;
}
LoadVertexToLabelPropertyIndexCache(index_it->key().ToString(), index_it->value().ToString(),
LoadVertexToLabelPropertyIndexCache(label, index_it->key().ToString(), index_it->value().ToString(),
CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str),
indexed_vertices->access());
}
@ -787,8 +809,8 @@ VertexAccessor DiskStorage::DiskAccessor::CreateVertex() {
}
VertexAccessor DiskStorage::DiskAccessor::CreateVertex(utils::SkipList<Vertex>::Accessor &accessor, storage::Gid gid,
const std::vector<LabelId> &label_ids,
PropertyStore &&properties, Delta *delta) {
std::vector<LabelId> &&label_ids, PropertyStore &&properties,
Delta *delta) {
OOMExceptionEnabler oom_exception;
auto *disk_storage = static_cast<DiskStorage *>(storage_);
disk_storage->vertex_id_.store(std::max(disk_storage->vertex_id_.load(std::memory_order_acquire), gid.AsUint() + 1),
@ -796,14 +818,9 @@ VertexAccessor DiskStorage::DiskAccessor::CreateVertex(utils::SkipList<Vertex>::
auto [it, inserted] = accessor.insert(Vertex{gid, delta});
MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != accessor.end(), "Invalid Vertex accessor!");
/// TODO: move
for (auto label_id : label_ids) {
it->labels.push_back(label_id);
}
it->labels = std::move(label_ids);
it->properties = std::move(properties);
if (delta) {
delta->prev.Set(&*it);
}
delta->prev.Set(&*it);
return {&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_};
}
@ -1300,33 +1317,35 @@ DiskStorage::DiskAccessor::CheckVertexConstraintsBeforeCommit(
/// TODO: andi I don't like that std::optional is used for checking errors but that's how it was before, refactor!
for (Vertex &vertex : vertex_acc) {
if (auto check_result = CheckVertexConstraintsBeforeCommit(vertex, unique_storage); check_result.HasError()) {
return check_result.GetError();
}
if (VertexNeedsToBeSerialized(vertex)) {
if (auto check_result = CheckVertexConstraintsBeforeCommit(vertex, unique_storage); check_result.HasError()) {
return check_result.GetError();
}
/// TODO: what if something is changed and then deleted
if (vertex.deleted) {
continue;
}
/// TODO: what if something is changed and then deleted and how does this work connected to indices and
/// constraints
if (vertex.deleted) {
continue;
}
/// TODO: expose temporal coupling
/// NOTE: this deletion has to come before writing, otherwise RocksDB thinks that all entries are deleted
/// TODO: This has to deal with index storage if read from index cache
if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(vertex.delta); maybe_old_disk_key.has_value()) {
if (!DeleteVertexFromDisk(maybe_old_disk_key.value())) {
/// TODO: expose temporal coupling
/// NOTE: this deletion has to come before writing, otherwise RocksDB thinks that all entries are deleted
/// TODO: This has to deal with index storage if read from index cache
if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(vertex.delta); maybe_old_disk_key.has_value()) {
if (!DeleteVertexFromDisk(maybe_old_disk_key.value())) {
return StorageDataManipulationError{SerializationError{}};
}
}
if (!WriteVertexToDisk(vertex)) {
return StorageDataManipulationError{SerializationError{}};
}
}
if (!WriteVertexToDisk(vertex)) {
return StorageDataManipulationError{SerializationError{}};
}
/// TODO: andi don't ignore the return value
if (!disk_unique_constraints->SyncVertexToUniqueConstraintsStorage(vertex, *commit_timestamp_) ||
!disk_label_index->SyncVertexToLabelIndexStorage(vertex, *commit_timestamp_) ||
!disk_label_property_index->SyncVertexToLabelPropertyIndexStorage(vertex, *commit_timestamp_)) {
return StorageDataManipulationError{SerializationError{}};
if (!disk_unique_constraints->SyncVertexToUniqueConstraintsStorage(vertex, *commit_timestamp_) ||
!disk_label_index->SyncVertexToLabelIndexStorage(vertex, *commit_timestamp_) ||
!disk_label_property_index->SyncVertexToLabelPropertyIndexStorage(vertex, *commit_timestamp_)) {
return StorageDataManipulationError{SerializationError{}};
}
}
for (auto &edge_entry : vertex.out_edges) {

View File

@ -197,20 +197,20 @@ class DiskStorage final : public Storage {
void FinalizeTransaction() override;
std::optional<storage::VertexAccessor> LoadVertexToLabelIndexCache(
std::string &&key, std::string &&value, Delta *index_delta,
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
utils::SkipList<storage::Vertex>::Accessor index_accessor);
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(std::string &&key, std::string &&value);
std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
std::string &&key, std::string &&value, Delta *index_delta,
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
utils::SkipList<storage::Vertex>::Accessor index_accessor);
std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value);
private:
VertexAccessor CreateVertex(utils::SkipList<Vertex>::Accessor &accessor, storage::Gid gid,
const std::vector<LabelId> &label_ids, PropertyStore &&properties, Delta *delta);
std::vector<LabelId> &&label_ids, PropertyStore &&properties, Delta *delta);
bool PrefetchEdgeFilter(const std::string_view disk_edge_key_str, const VertexAccessor &vertex_acc,
EdgeDirection edge_direction);
@ -246,6 +246,7 @@ class DiskStorage final : public Storage {
std::unordered_set<std::string> edges_to_delete_;
std::vector<std::pair<std::string, std::string>> vertices_to_delete_;
rocksdb::Transaction *disk_transaction_;
bool scanned_all_vertices_ = false;
};
std::unique_ptr<Storage::Accessor> Access(std::optional<IsolationLevel> override_isolation_level) override {

View File

@ -36,18 +36,27 @@ inline std::string SerializeIdType(const auto &id) { return std::to_string(id.As
inline bool SerializedVertexHasLabels(const std::string &labels) { return !labels.empty(); }
template <typename Collection>
inline std::vector<std::string> TransformIDsToString(const Collection &labels) {
std::vector<std::string> transformed_labels{};
std::transform(labels.begin(), labels.end(), std::back_inserter(transformed_labels),
[](const auto &label) { return SerializeIdType(label); });
return transformed_labels;
template <typename T>
concept WithSize = requires(const T value) {
{ value.size() } -> std::same_as<size_t>;
};
template <WithSize TCollection>
inline std::vector<std::string> TransformIDsToString(const TCollection &col) {
std::vector<std::string> transformed_col;
transformed_col.reserve(col.size());
for (const auto &elem : col) {
transformed_col.emplace_back(SerializeIdType(elem));
}
return transformed_col;
}
inline std::vector<storage::LabelId> TransformFromStringLabels(const std::vector<std::string> &labels) {
inline std::vector<storage::LabelId> TransformFromStringLabels(std::vector<std::string> &&labels) {
std::vector<storage::LabelId> transformed_labels;
std::transform(labels.begin(), labels.end(), std::back_inserter(transformed_labels),
[](const auto &label) { return storage::LabelId::FromUint(std::stoull(label)); });
transformed_labels.reserve(labels.size());
for (const std::string &label : labels) {
transformed_labels.emplace_back(storage::LabelId::FromUint(std::stoull(label)));
}
return transformed_labels;
}
@ -91,9 +100,13 @@ inline std::string SerializeVertexAsValueForAuxiliaryStorages(storage::LabelId l
const std::vector<storage::LabelId> &vertex_labels,
const storage::PropertyStore &property_store) {
std::vector<storage::LabelId> labels_without_target;
std::copy_if(vertex_labels.begin(), vertex_labels.end(), std::back_inserter(labels_without_target),
[&label_to_remove](const auto &label) { return label_to_remove != label; });
std::string result = SerializeLabels(TransformIDsToString(vertex_labels)) + "|";
labels_without_target.reserve(vertex_labels.size());
for (const storage::LabelId &label : vertex_labels) {
if (label != label_to_remove) {
labels_without_target.emplace_back(label);
}
}
std::string result = SerializeLabels(TransformIDsToString(labels_without_target)) + "|";
return result + SerializeProperties(property_store);
}
@ -115,8 +128,7 @@ inline std::string SerializeVertex(const storage::Vertex &vertex) {
}
inline std::vector<storage::LabelId> DeserializeLabelsFromMainDiskStorage(const std::string &key) {
std::vector<std::string> key_vector = utils::Split(key, "|");
std::string labels_str = key_vector[0];
std::string labels_str = key.substr(0, key.find('|'));
if (SerializedVertexHasLabels(labels_str)) {
return TransformFromStringLabels(utils::Split(labels_str, ","));
}
@ -181,9 +193,13 @@ inline std::string SerializeVertexAsValueForLabelIndex(storage::LabelId indexing
return SerializeVertexAsValueForAuxiliaryStorages(indexing_label, vertex_labels, property_store);
}
inline std::vector<storage::LabelId> DeserializeLabelsFromIndexStorage(const std::string &value) {
std::string labels = value.substr(0, value.find('|'));
return TransformFromStringLabels(utils::Split(labels, ","));
}
inline std::vector<storage::LabelId> DeserializeLabelsFromLabelIndexStorage(const std::string &value) {
const auto value_splitted = utils::Split(value, "|");
return TransformFromStringLabels(utils::Split(value_splitted[0], ","));
return DeserializeLabelsFromIndexStorage(value);
}
inline storage::PropertyStore DeserializePropertiesFromLabelIndexStorage(const std::string &value) {
@ -213,10 +229,8 @@ inline std::string ExtractGidFromLabelPropertyIndexStorage(const std::string &ke
return key_vector[2];
}
/// TODO: refactor into one method with label index storage
inline std::vector<storage::LabelId> DeserializeLabelsFromLabelPropertyIndexStorage(const std::string &value) {
const auto value_splitted = utils::Split(value, "|");
return TransformFromStringLabels(utils::Split(value_splitted[0], ","));
return DeserializeLabelsFromIndexStorage(value);
}
inline storage::PropertyStore DeserializePropertiesFromLabelPropertyIndexStorage(const std::string &value) {