Refactor disk storage (#1347)
This commit is contained in:
parent
766ac48261
commit
de9280b334
@ -1548,7 +1548,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
// populates the to_visit_next_ structure with expansions
|
||||
// from the given vertex. skips expansions that don't satisfy
|
||||
// the "where" condition.
|
||||
auto expand_from_vertex = [this, &expand_pair, &context](const auto &vertex) {
|
||||
auto expand_from_vertex = [this, &expand_pair](const auto &vertex) {
|
||||
if (self_.common_.direction != EdgeAtom::Direction::IN) {
|
||||
auto out_edges = UnwrapEdgesResult(vertex.OutEdges(storage::View::OLD, self_.common_.edge_types)).edges;
|
||||
for (const auto &edge : out_edges) expand_pair(edge, edge.To());
|
||||
@ -1748,8 +1748,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
// Populates the priority queue structure with expansions
|
||||
// from the given vertex. skips expansions that don't satisfy
|
||||
// the "where" condition.
|
||||
auto expand_from_vertex = [this, &expand_pair, &context](const VertexAccessor &vertex, const TypedValue &weight,
|
||||
int64_t depth) {
|
||||
auto expand_from_vertex = [this, &expand_pair](const VertexAccessor &vertex, const TypedValue &weight,
|
||||
int64_t depth) {
|
||||
if (self_.common_.direction != EdgeAtom::Direction::IN) {
|
||||
auto out_edges = UnwrapEdgesResult(vertex.OutEdges(storage::View::OLD, self_.common_.edge_types)).edges;
|
||||
for (const auto &edge : out_edges) {
|
||||
|
@ -25,6 +25,7 @@ add_library(mg-storage-v2 STATIC
|
||||
inmemory/label_index.cpp
|
||||
inmemory/label_property_index.cpp
|
||||
inmemory/unique_constraints.cpp
|
||||
disk/durable_metadata.cpp
|
||||
disk/edge_import_mode_cache.cpp
|
||||
disk/storage.cpp
|
||||
disk/rocksdb_storage.cpp
|
||||
|
173
src/storage/v2/disk/durable_metadata.cpp
Normal file
173
src/storage/v2/disk/durable_metadata.cpp
Normal file
@ -0,0 +1,173 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <charconv>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "kvstore/kvstore.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/disk/durable_metadata.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/rocksdb_serialization.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
namespace {
|
||||
constexpr const char *kLastTransactionStartTimeStamp = "last_transaction_start_timestamp";
|
||||
constexpr const char *kVertexCountDescr = "vertex_count";
|
||||
constexpr const char *kEdgeDountDescr = "edge_count";
|
||||
constexpr const char *kLabelIndexStr = "label_index";
|
||||
constexpr const char *kLabelPropertyIndexStr = "label_property_index";
|
||||
constexpr const char *kExistenceConstraintsStr = "existence_constraints";
|
||||
constexpr const char *kUniqueConstraintsStr = "unique_constraints";
|
||||
} // namespace
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
DurableMetadata::DurableMetadata(const Config &config)
|
||||
: durability_kvstore_(kvstore::KVStore(config.disk.durability_directory)), config_(config) {
|
||||
MG_ASSERT(utils::DirExists(config_.disk.durability_directory),
|
||||
"Durability directory for saving disk metadata does not exist.");
|
||||
}
|
||||
|
||||
DurableMetadata::DurableMetadata(DurableMetadata &&other) noexcept
|
||||
: durability_kvstore_(std::move(other.durability_kvstore_)), config_(std::move(other.config_)) {}
|
||||
|
||||
void DurableMetadata::SaveBeforeClosingDB(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count) {
|
||||
durability_kvstore_.Put(kLastTransactionStartTimeStamp, std::to_string(timestamp));
|
||||
durability_kvstore_.Put(kVertexCountDescr, std::to_string(vertex_count));
|
||||
durability_kvstore_.Put(kEdgeDountDescr, std::to_string(edge_count));
|
||||
}
|
||||
|
||||
std::optional<uint64_t> DurableMetadata::LoadTimestampIfExists() const {
|
||||
return LoadPropertyIfExists(kLastTransactionStartTimeStamp);
|
||||
}
|
||||
|
||||
std::optional<uint64_t> DurableMetadata::LoadVertexCountIfExists() const {
|
||||
return LoadPropertyIfExists(kVertexCountDescr);
|
||||
}
|
||||
|
||||
std::optional<uint64_t> DurableMetadata::LoadEdgeCountIfExists() const { return LoadPropertyIfExists(kEdgeDountDescr); }
|
||||
|
||||
std::optional<uint64_t> DurableMetadata::LoadPropertyIfExists(const std::string &property) const {
|
||||
if (auto count = durability_kvstore_.Get(property); count.has_value()) {
|
||||
auto last_count = count.value();
|
||||
uint64_t count_to_return{0U};
|
||||
if (std::from_chars(last_count.data(), last_count.data() + last_count.size(), count_to_return).ec == std::errc()) {
|
||||
return count_to_return;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<std::vector<std::string>> DurableMetadata::LoadLabelIndexInfoIfExists() const {
|
||||
return LoadInfoFromAuxiliaryStorages(kLabelIndexStr);
|
||||
}
|
||||
|
||||
std::optional<std::vector<std::string>> DurableMetadata::LoadLabelPropertyIndexInfoIfExists() const {
|
||||
return LoadInfoFromAuxiliaryStorages(kLabelPropertyIndexStr);
|
||||
}
|
||||
|
||||
std::optional<std::vector<std::string>> DurableMetadata::LoadExistenceConstraintInfoIfExists() const {
|
||||
return LoadInfoFromAuxiliaryStorages(kExistenceConstraintsStr);
|
||||
}
|
||||
|
||||
std::optional<std::vector<std::string>> DurableMetadata::LoadUniqueConstraintInfoIfExists() const {
|
||||
return LoadInfoFromAuxiliaryStorages(kUniqueConstraintsStr);
|
||||
}
|
||||
|
||||
std::optional<std::vector<std::string>> DurableMetadata::LoadInfoFromAuxiliaryStorages(
|
||||
const std::string &property) const {
|
||||
if (auto maybe_props = durability_kvstore_.Get(property); maybe_props.has_value()) {
|
||||
return utils::Split(maybe_props.value(), "|");
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
bool DurableMetadata::PersistLabelIndexCreation(LabelId label) {
|
||||
const auto serialized_label = label.ToString();
|
||||
if (auto label_index_store = durability_kvstore_.Get(kLabelIndexStr); label_index_store.has_value()) {
|
||||
std::string &value = label_index_store.value();
|
||||
value += "|";
|
||||
value += serialized_label;
|
||||
return durability_kvstore_.Put(kLabelIndexStr, value);
|
||||
}
|
||||
return durability_kvstore_.Put(kLabelIndexStr, serialized_label);
|
||||
}
|
||||
|
||||
bool DurableMetadata::PersistLabelIndexDeletion(LabelId label) {
|
||||
const auto serialized_label = label.ToString();
|
||||
if (auto label_index_store = durability_kvstore_.Get(kLabelIndexStr); label_index_store.has_value()) {
|
||||
const std::string &value = label_index_store.value();
|
||||
std::vector<std::string> labels = utils::Split(value, "|");
|
||||
std::erase(labels, serialized_label);
|
||||
if (labels.empty()) {
|
||||
return durability_kvstore_.Delete(kLabelIndexStr);
|
||||
}
|
||||
return durability_kvstore_.Put(kLabelIndexStr, utils::Join(labels, "|"));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DurableMetadata::PersistLabelPropertyIndexAndExistenceConstraintCreation(LabelId label, PropertyId property,
|
||||
const std::string &key) {
|
||||
const std::string label_property_pair = label.ToString() + "," + property.ToString();
|
||||
if (auto label_property_index_store = durability_kvstore_.Get(key); label_property_index_store.has_value()) {
|
||||
std::string &value = label_property_index_store.value();
|
||||
value += "|";
|
||||
value += label_property_pair;
|
||||
return durability_kvstore_.Put(key, value);
|
||||
}
|
||||
return durability_kvstore_.Put(key, label_property_pair);
|
||||
}
|
||||
|
||||
bool DurableMetadata::PersistLabelPropertyIndexAndExistenceConstraintDeletion(LabelId label, PropertyId property,
|
||||
const std::string &key) {
|
||||
const std::string label_property_pair = label.ToString() + "," + property.ToString();
|
||||
if (auto label_property_index_store = durability_kvstore_.Get(key); label_property_index_store.has_value()) {
|
||||
const std::string &value = label_property_index_store.value();
|
||||
std::vector<std::string> label_properties = utils::Split(value, "|");
|
||||
std::erase(label_properties, label_property_pair);
|
||||
if (label_properties.empty()) {
|
||||
return durability_kvstore_.Delete(key);
|
||||
}
|
||||
return durability_kvstore_.Put(key, utils::Join(label_properties, "|"));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DurableMetadata::PersistUniqueConstraintCreation(LabelId label, const std::set<PropertyId> &properties) {
|
||||
const std::string entry = utils::GetKeyForUniqueConstraintsDurability(label, properties);
|
||||
|
||||
if (auto unique_store = durability_kvstore_.Get(kUniqueConstraintsStr); unique_store.has_value()) {
|
||||
std::string &value = unique_store.value();
|
||||
value += "|" + entry;
|
||||
return durability_kvstore_.Put(kUniqueConstraintsStr, value);
|
||||
}
|
||||
return durability_kvstore_.Put(kUniqueConstraintsStr, entry);
|
||||
}
|
||||
|
||||
bool DurableMetadata::PersistUniqueConstraintDeletion(LabelId label, const std::set<PropertyId> &properties) {
|
||||
const std::string entry = utils::GetKeyForUniqueConstraintsDurability(label, properties);
|
||||
|
||||
if (auto unique_store = durability_kvstore_.Get(kUniqueConstraintsStr); unique_store.has_value()) {
|
||||
const std::string &value = unique_store.value();
|
||||
std::vector<std::string> unique_constraints = utils::Split(value, "|");
|
||||
std::erase(unique_constraints, entry);
|
||||
if (unique_constraints.empty()) {
|
||||
return durability_kvstore_.Delete(kUniqueConstraintsStr);
|
||||
}
|
||||
return durability_kvstore_.Put(kUniqueConstraintsStr, utils::Join(unique_constraints, "|"));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage
|
68
src/storage/v2/disk/durable_metadata.hpp
Normal file
68
src/storage/v2/disk/durable_metadata.hpp
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
#include "kvstore/kvstore.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
class DurableMetadata {
|
||||
public:
|
||||
explicit DurableMetadata(const Config &config);
|
||||
|
||||
DurableMetadata(const DurableMetadata &) = delete;
|
||||
DurableMetadata &operator=(const DurableMetadata &) = delete;
|
||||
DurableMetadata &operator=(DurableMetadata &&) = delete;
|
||||
|
||||
DurableMetadata(DurableMetadata &&other) noexcept;
|
||||
|
||||
~DurableMetadata() = default;
|
||||
|
||||
std::optional<uint64_t> LoadTimestampIfExists() const;
|
||||
std::optional<uint64_t> LoadVertexCountIfExists() const;
|
||||
std::optional<uint64_t> LoadEdgeCountIfExists() const;
|
||||
std::optional<std::vector<std::string>> LoadLabelIndexInfoIfExists() const;
|
||||
std::optional<std::vector<std::string>> LoadLabelPropertyIndexInfoIfExists() const;
|
||||
std::optional<std::vector<std::string>> LoadExistenceConstraintInfoIfExists() const;
|
||||
std::optional<std::vector<std::string>> LoadUniqueConstraintInfoIfExists() const;
|
||||
|
||||
void SaveBeforeClosingDB(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count);
|
||||
|
||||
bool PersistLabelIndexCreation(LabelId label);
|
||||
|
||||
bool PersistLabelIndexDeletion(LabelId label);
|
||||
|
||||
bool PersistLabelPropertyIndexAndExistenceConstraintCreation(LabelId label, PropertyId property,
|
||||
const std::string &key);
|
||||
|
||||
bool PersistLabelPropertyIndexAndExistenceConstraintDeletion(LabelId label, PropertyId property,
|
||||
const std::string &key);
|
||||
|
||||
bool PersistUniqueConstraintCreation(LabelId label, const std::set<PropertyId> &properties);
|
||||
|
||||
bool PersistUniqueConstraintDeletion(LabelId label, const std::set<PropertyId> &properties);
|
||||
|
||||
private:
|
||||
std::optional<uint64_t> LoadPropertyIfExists(const std::string &property) const;
|
||||
std::optional<std::vector<std::string>> LoadInfoFromAuxiliaryStorages(const std::string &property) const;
|
||||
|
||||
kvstore::KVStore durability_kvstore_;
|
||||
Config config_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage
|
@ -190,9 +190,10 @@ bool DiskLabelIndex::DropIndex(LabelId label) {
|
||||
rocksdb::Slice ts(strTs);
|
||||
ro.timestamp = &ts;
|
||||
auto it = std::unique_ptr<rocksdb::Iterator>(disk_transaction->GetIterator(ro));
|
||||
const std::string serialized_label = label.ToString();
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
std::string key = it->key().ToString();
|
||||
if (key.starts_with(utils::SerializeIdType(label))) {
|
||||
if (key.starts_with(serialized_label)) {
|
||||
disk_transaction->Delete(it->key().ToString());
|
||||
}
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -13,6 +13,7 @@
|
||||
|
||||
#include "kvstore/kvstore.hpp"
|
||||
#include "storage/v2/constraints/constraint_violation.hpp"
|
||||
#include "storage/v2/disk/durable_metadata.hpp"
|
||||
#include "storage/v2/disk/edge_import_mode_cache.hpp"
|
||||
#include "storage/v2/disk/rocksdb_storage.hpp"
|
||||
#include "storage/v2/edge_import_mode.hpp"
|
||||
@ -46,55 +47,6 @@ class DiskStorage final : public Storage {
|
||||
|
||||
explicit DiskAccessor(auto tag, DiskStorage *storage, IsolationLevel isolation_level, StorageMode storage_mode);
|
||||
|
||||
/// TODO: const methods?
|
||||
void LoadVerticesToMainMemoryCache();
|
||||
|
||||
void LoadVerticesFromMainStorageToEdgeImportCache();
|
||||
|
||||
void HandleMainLoadingForEdgeImportCache();
|
||||
|
||||
void LoadVerticesFromLabelIndexStorageToEdgeImportCache(LabelId label);
|
||||
|
||||
void HandleLoadingLabelForEdgeImportCache(LabelId label);
|
||||
|
||||
void LoadVerticesFromLabelPropertyIndexStorageToEdgeImportCache(LabelId label, PropertyId property);
|
||||
|
||||
void HandleLoadingLabelPropertyForEdgeImportCache(LabelId label, PropertyId property);
|
||||
|
||||
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelIndexCache(LabelId label, View view,
|
||||
std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
|
||||
void LoadVerticesFromDiskLabelIndex(LabelId label, const std::unordered_set<storage::Gid> &gids,
|
||||
std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices);
|
||||
|
||||
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
|
||||
LabelId label, PropertyId property, View view, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices, const auto &label_property_filter);
|
||||
|
||||
void LoadVerticesFromDiskLabelPropertyIndex(LabelId label, PropertyId property,
|
||||
const std::unordered_set<storage::Gid> &gids,
|
||||
std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices,
|
||||
const auto &label_property_filter);
|
||||
|
||||
void LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(LabelId label, PropertyId property,
|
||||
const std::unordered_set<storage::Gid> &gids,
|
||||
const PropertyValue &value,
|
||||
std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
|
||||
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch(
|
||||
LabelId label, PropertyId property, View view, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
|
||||
void LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(
|
||||
LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids,
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
|
||||
public:
|
||||
DiskAccessor(const DiskAccessor &) = delete;
|
||||
DiskAccessor &operator=(const DiskAccessor &) = delete;
|
||||
@ -197,17 +149,6 @@ class DiskStorage final : public Storage {
|
||||
|
||||
void FinalizeTransaction() override;
|
||||
|
||||
std::optional<storage::VertexAccessor> LoadVertexToLabelIndexCache(
|
||||
const std::string &key, const std::string &value, Delta *index_delta,
|
||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||
|
||||
std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
|
||||
const std::string &key, const std::string &value, Delta *index_delta,
|
||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||
|
||||
std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value,
|
||||
const rocksdb::Slice &ts);
|
||||
|
||||
utils::BasicResult<StorageIndexDefinitionError, void> CreateIndex(LabelId label) override;
|
||||
|
||||
utils::BasicResult<StorageIndexDefinitionError, void> CreateIndex(LabelId label, PropertyId property) override;
|
||||
@ -227,52 +168,89 @@ class DiskStorage final : public Storage {
|
||||
|
||||
UniqueConstraints::DeletionStatus DropUniqueConstraint(LabelId label,
|
||||
const std::set<PropertyId> &properties) override;
|
||||
|
||||
private:
|
||||
/// Flushes vertices and edges to the disk with the commit timestamp.
|
||||
/// At the time of calling, the commit_timestamp_ must already exist.
|
||||
/// After this method, the vertex and edge caches are cleared.
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushIndexCache();
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushDeletedVertices();
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushDeletedEdges();
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushVertices(
|
||||
const auto &vertex_acc, std::vector<std::vector<PropertyValue>> &unique_storage);
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushModifiedEdges(const auto &edge_acc);
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> ClearDanglingVertices();
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> CheckVertexConstraintsBeforeCommit(
|
||||
const Vertex &vertex, std::vector<std::vector<PropertyValue>> &unique_storage) const;
|
||||
|
||||
bool WriteVertexToVertexColumnFamily(const Vertex &vertex);
|
||||
bool WriteEdgeToEdgeColumnFamily(const std::string &serialized_edge_key, const std::string &serialized_edge_value);
|
||||
|
||||
bool WriteEdgeToConnectivityIndex(const std::string &vertex_gid, const std::string &edge_gid,
|
||||
rocksdb::ColumnFamilyHandle *handle, std::string mode);
|
||||
|
||||
bool DeleteVertexFromDisk(const std::string &vertex_gid, const std::string &vertex);
|
||||
|
||||
bool DeleteEdgeFromEdgeColumnFamily(const std::string &edge_gid);
|
||||
bool DeleteEdgeFromDisk(const std::string &edge_gid, const std::string &src_vertex_gid,
|
||||
const std::string &dst_vertex_gid);
|
||||
bool DeleteEdgeFromConnectivityIndex(const std::string &vertex_gid, const std::string &edge_gid,
|
||||
rocksdb::ColumnFamilyHandle *handle, std::string mode);
|
||||
};
|
||||
|
||||
std::unique_ptr<Storage::Accessor> Access(std::optional<IsolationLevel> override_isolation_level) override;
|
||||
|
||||
std::unique_ptr<Storage::Accessor> UniqueAccess(std::optional<IsolationLevel> override_isolation_level) override;
|
||||
|
||||
/// TODO: (andi) Methods working with rocksdb are scattered around DiskStorage and DiskStorage::DiskAccessor
|
||||
/// Two options:
|
||||
/// 1. move everything under DiskStorage level
|
||||
/// 2. propagate DiskStorage::DiskAccessor to vertex and edge accessor.
|
||||
/// Out of scope of this PR
|
||||
/// Flushing methods
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushIndexCache(Transaction *transaction);
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushVertices(
|
||||
Transaction *transaction, const auto &vertex_acc, std::vector<std::vector<PropertyValue>> &unique_storage);
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> CheckVertexConstraintsBeforeCommit(
|
||||
const Vertex &vertex, std::vector<std::vector<PropertyValue>> &unique_storage) const;
|
||||
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushDeletedVertices(Transaction *transaction);
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushDeletedEdges(Transaction *transaction);
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> FlushModifiedEdges(Transaction *transaction,
|
||||
const auto &edge_acc);
|
||||
[[nodiscard]] utils::BasicResult<StorageManipulationError, void> ClearDanglingVertices(Transaction *transaction);
|
||||
|
||||
/// Writing methods
|
||||
bool WriteVertexToVertexColumnFamily(Transaction *transaction, const Vertex &vertex);
|
||||
bool WriteEdgeToEdgeColumnFamily(Transaction *transaction, const std::string &serialized_edge_key,
|
||||
const std::string &serialized_edge_value);
|
||||
bool WriteEdgeToConnectivityIndex(Transaction *transaction, const std::string &vertex_gid,
|
||||
const std::string &edge_gid, rocksdb::ColumnFamilyHandle *handle, std::string mode);
|
||||
bool DeleteVertexFromDisk(Transaction *transaction, const std::string &vertex_gid, const std::string &vertex);
|
||||
bool DeleteEdgeFromEdgeColumnFamily(Transaction *transaction, const std::string &edge_gid);
|
||||
bool DeleteEdgeFromDisk(Transaction *transaction, const std::string &edge_gid, const std::string &src_vertex_gid,
|
||||
const std::string &dst_vertex_gid);
|
||||
bool DeleteEdgeFromConnectivityIndex(Transaction *transaction, const std::string &vertex_gid,
|
||||
const std::string &edge_gid, rocksdb::ColumnFamilyHandle *handle,
|
||||
std::string mode);
|
||||
|
||||
void LoadVerticesToMainMemoryCache(Transaction *transaction);
|
||||
|
||||
/// Edge import mode methods
|
||||
void LoadVerticesFromMainStorageToEdgeImportCache(Transaction *transaction);
|
||||
void HandleMainLoadingForEdgeImportCache(Transaction *transaction);
|
||||
|
||||
/// Indices methods
|
||||
/// Label-index
|
||||
void LoadVerticesFromLabelIndexStorageToEdgeImportCache(Transaction *transaction, LabelId label);
|
||||
void HandleLoadingLabelForEdgeImportCache(Transaction *transaction, LabelId label);
|
||||
void LoadVerticesFromDiskLabelIndex(Transaction *transaction, LabelId label,
|
||||
const std::unordered_set<storage::Gid> &gids, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
std::optional<storage::VertexAccessor> LoadVertexToLabelIndexCache(
|
||||
Transaction *transaction, const std::string &key, const std::string &value, Delta *index_delta,
|
||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelIndexCache(Transaction *transaction, LabelId label,
|
||||
View view, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
|
||||
/// Label-property-index
|
||||
void LoadVerticesFromLabelPropertyIndexStorageToEdgeImportCache(Transaction *transaction, LabelId label,
|
||||
PropertyId property);
|
||||
void HandleLoadingLabelPropertyForEdgeImportCache(Transaction *transaction, LabelId label, PropertyId property);
|
||||
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
|
||||
Transaction *transaction, LabelId label, PropertyId property, View view, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices, const auto &label_property_filter);
|
||||
void LoadVerticesFromDiskLabelPropertyIndex(Transaction *transaction, LabelId label, PropertyId property,
|
||||
const std::unordered_set<storage::Gid> &gids,
|
||||
std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices,
|
||||
const auto &label_property_filter);
|
||||
std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
|
||||
Transaction *transaction, const std::string &key, const std::string &value, Delta *index_delta,
|
||||
utils::SkipList<storage::Vertex>::Accessor index_accessor);
|
||||
void LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(
|
||||
Transaction *transaction, LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids,
|
||||
const PropertyValue &value, std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices);
|
||||
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch(
|
||||
Transaction *transaction, LabelId label, PropertyId property, View view,
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
void LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(
|
||||
Transaction *transaction, LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids,
|
||||
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
|
||||
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
|
||||
utils::SkipList<Vertex> *indexed_vertices);
|
||||
|
||||
VertexAccessor CreateVertexFromDisk(Transaction *transaction, utils::SkipList<Vertex>::Accessor &accessor,
|
||||
storage::Gid gid, std::vector<LabelId> label_ids, PropertyStore properties,
|
||||
Delta *delta);
|
||||
@ -280,7 +258,6 @@ class DiskStorage final : public Storage {
|
||||
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(Transaction *transaction, const std::string &key,
|
||||
const std::string &value, std::string &&ts);
|
||||
|
||||
/// TODO: (andi) I don't think View is necessary
|
||||
std::optional<VertexAccessor> FindVertex(Gid gid, Transaction *transaction, View view);
|
||||
|
||||
std::optional<EdgeAccessor> CreateEdgeFromDisk(const VertexAccessor *from, const VertexAccessor *to,
|
||||
@ -288,12 +265,10 @@ class DiskStorage final : public Storage {
|
||||
std::string_view properties, const std::string &old_disk_key,
|
||||
std::string &&ts);
|
||||
|
||||
/// TODO: (andi) Maybe const
|
||||
std::vector<EdgeAccessor> OutEdges(const VertexAccessor *src_vertex,
|
||||
const std::vector<EdgeTypeId> &possible_edge_types,
|
||||
const VertexAccessor *destination, Transaction *transaction, View view);
|
||||
|
||||
/// TODO: (andi) Maybe const
|
||||
std::vector<EdgeAccessor> InEdges(const VertexAccessor *dst_vertex,
|
||||
const std::vector<EdgeTypeId> &possible_edge_types, const VertexAccessor *source,
|
||||
Transaction *transaction, View view);
|
||||
@ -307,39 +282,10 @@ class DiskStorage final : public Storage {
|
||||
EdgeImportMode GetEdgeImportMode() const;
|
||||
|
||||
private:
|
||||
void LoadIndexInfoIfExists() const;
|
||||
|
||||
/// TODO (andi): Maybe good to separate these methods and durability kvstore into a separate class
|
||||
bool PersistLabelIndexCreation(LabelId label) const;
|
||||
|
||||
bool PersistLabelIndexDeletion(LabelId label) const;
|
||||
|
||||
void LoadLabelIndexInfoIfExists() const;
|
||||
|
||||
bool PersistLabelPropertyIndexAndExistenceConstraintCreation(LabelId label, PropertyId property,
|
||||
const char *key) const;
|
||||
|
||||
bool PersistLabelPropertyIndexAndExistenceConstraintDeletion(LabelId label, PropertyId property,
|
||||
const char *key) const;
|
||||
|
||||
void LoadLabelPropertyIndexInfoIfExists() const;
|
||||
|
||||
void LoadConstraintsInfoIfExists() const;
|
||||
|
||||
void LoadExistenceConstraintInfoIfExists() const;
|
||||
|
||||
bool PersistUniqueConstraintCreation(LabelId label, const std::set<PropertyId> &properties) const;
|
||||
|
||||
bool PersistUniqueConstraintDeletion(LabelId label, const std::set<PropertyId> &properties) const;
|
||||
|
||||
void LoadUniqueConstraintInfoIfExists() const;
|
||||
void LoadPersistingMetadataInfo();
|
||||
|
||||
uint64_t GetDiskSpaceUsage() const;
|
||||
|
||||
void LoadTimestampIfExists();
|
||||
|
||||
void LoadVertexAndEdgeCountIfExists();
|
||||
|
||||
[[nodiscard]] std::optional<ConstraintViolation> CheckExistingVerticesBeforeCreatingExistenceConstraint(
|
||||
LabelId label, PropertyId property) const;
|
||||
|
||||
@ -355,29 +301,26 @@ class DiskStorage final : public Storage {
|
||||
|
||||
void FreeMemory(std::unique_lock<utils::ResourceLock> /*lock*/) override {}
|
||||
|
||||
void PrepareForNewEpoch(std::string prev_epoch) override {
|
||||
void PrepareForNewEpoch(std::string /*prev_epoch*/) override {
|
||||
throw utils::BasicException("Disk storage mode does not support replication.");
|
||||
}
|
||||
|
||||
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
|
||||
|
||||
EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE};
|
||||
std::unique_ptr<EdgeImportModeCache> edge_import_mode_cache_{nullptr};
|
||||
|
||||
auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config)
|
||||
auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig & /*config*/)
|
||||
-> std::unique_ptr<ReplicationClient> override {
|
||||
throw utils::BasicException("Disk storage mode does not support replication.");
|
||||
}
|
||||
|
||||
auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config)
|
||||
auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig & /*config*/)
|
||||
-> std::unique_ptr<ReplicationServer> override {
|
||||
throw utils::BasicException("Disk storage mode does not support replication.");
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<RocksDBStorage> kvstore_;
|
||||
std::unique_ptr<kvstore::KVStore> durability_kvstore_;
|
||||
|
||||
DurableMetadata durable_metadata_;
|
||||
EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE};
|
||||
std::unique_ptr<EdgeImportModeCache> edge_import_mode_cache_{nullptr};
|
||||
std::atomic<uint64_t> vertex_count_{0};
|
||||
};
|
||||
|
||||
|
@ -33,7 +33,7 @@ bool IsVertexUnderConstraint(const Vertex &vertex, const LabelId &constraint_lab
|
||||
|
||||
bool IsDifferentVertexWithSameConstraintLabel(const std::string &key, const Gid gid, const LabelId constraint_label) {
|
||||
const std::vector<std::string> vertex_parts = utils::Split(key, "|");
|
||||
if (std::string local_gid = vertex_parts[1]; local_gid == utils::SerializeIdType(gid)) {
|
||||
if (std::string local_gid = vertex_parts[1]; local_gid == gid.ToString()) {
|
||||
return false;
|
||||
}
|
||||
return utils::DeserializeConstraintLabelFromUniqueConstraintStorage(key) == constraint_label;
|
||||
@ -45,7 +45,7 @@ bool IsDifferentVertexWithSameConstraintLabel(const std::string &key, const Gid
|
||||
for (const auto &[vertex_gid, constraints] : transaction_entries) {
|
||||
for (const auto &[constraint_label, constraint_properties] : constraints) {
|
||||
auto key_to_delete = utils::SerializeVertexAsKeyForUniqueConstraint(constraint_label, constraint_properties,
|
||||
utils::SerializeIdType(vertex_gid));
|
||||
vertex_gid.ToString());
|
||||
if (auto status = disk_transaction.Delete(key_to_delete); !status.ok()) {
|
||||
return false;
|
||||
}
|
||||
@ -217,8 +217,7 @@ bool DiskUniqueConstraints::SyncVertexToUniqueConstraintsStorage(const Vertex &v
|
||||
kvstore_->db_->BeginTransaction(rocksdb::WriteOptions(), rocksdb::TransactionOptions()));
|
||||
|
||||
if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(vertex.delta); maybe_old_disk_key.has_value()) {
|
||||
spdlog::trace("Found old disk key {} for vertex {}", maybe_old_disk_key.value(),
|
||||
utils::SerializeIdType(vertex.gid));
|
||||
spdlog::trace("Found old disk key {} for vertex {}", maybe_old_disk_key.value(), vertex.gid.ToString());
|
||||
if (auto status = disk_transaction->Delete(maybe_old_disk_key.value()); !status.ok()) {
|
||||
return false;
|
||||
}
|
||||
@ -227,7 +226,7 @@ bool DiskUniqueConstraints::SyncVertexToUniqueConstraintsStorage(const Vertex &v
|
||||
for (const auto &[constraint_label, constraint_properties] : constraints_) {
|
||||
if (IsVertexUnderConstraint(vertex, constraint_label, constraint_properties)) {
|
||||
auto key = utils::SerializeVertexAsKeyForUniqueConstraint(constraint_label, constraint_properties,
|
||||
utils::SerializeIdType(vertex.gid));
|
||||
vertex.gid.ToString());
|
||||
auto value = utils::SerializeVertexAsValueForUniqueConstraint(constraint_label, vertex.labels, vertex.properties);
|
||||
if (!disk_transaction->Put(key, value).ok()) {
|
||||
return false;
|
||||
|
@ -13,6 +13,7 @@
|
||||
|
||||
#include <charconv>
|
||||
#include <functional>
|
||||
#include <string>
|
||||
#include <system_error>
|
||||
#include <type_traits>
|
||||
#include <utils/exceptions.hpp>
|
||||
@ -36,6 +37,7 @@ namespace memgraph::storage {
|
||||
uint64_t AsUint() const { return id_; } \
|
||||
int64_t AsInt() const { return utils::MemcpyCast<int64_t>(id_); } \
|
||||
static name FromString(std::string_view id) { return name{utils::ParseStringToUint64(id)}; } \
|
||||
std::string ToString() const { return std::to_string(id_); } \
|
||||
\
|
||||
private: \
|
||||
uint64_t id_; \
|
||||
|
@ -27,13 +27,6 @@
|
||||
#include "utils/typeinfo.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
namespace memgraph::metrics {
|
||||
extern const Event SnapshotCreationLatency_us;
|
||||
|
||||
extern const Event ActiveLabelIndices;
|
||||
extern const Event ActiveLabelPropertyIndices;
|
||||
} // namespace memgraph::metrics
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
class InMemoryStorage;
|
||||
@ -59,7 +52,9 @@ Storage::Storage(Config config, StorageMode storage_mode)
|
||||
storage_mode_(storage_mode),
|
||||
indices_(config, storage_mode),
|
||||
constraints_(config, storage_mode),
|
||||
id_(config.name) {}
|
||||
id_(config.name) {
|
||||
spdlog::info("Created database with {} storage mode.", StorageModeToString(storage_mode));
|
||||
}
|
||||
|
||||
Storage::Accessor::Accessor(SharedAccess /* tag */, Storage *storage, IsolationLevel isolation_level,
|
||||
StorageMode storage_mode)
|
||||
|
@ -301,8 +301,8 @@ class Storage {
|
||||
virtual auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config)
|
||||
-> std::unique_ptr<ReplicationServer> = 0;
|
||||
|
||||
auto ReplicasInfo() { return repl_storage_state_.ReplicasInfo(); }
|
||||
auto GetReplicaState(std::string_view name) -> std::optional<replication::ReplicaState> {
|
||||
auto ReplicasInfo() const { return repl_storage_state_.ReplicasInfo(); }
|
||||
auto GetReplicaState(std::string_view name) const -> std::optional<replication::ReplicaState> {
|
||||
return repl_storage_state_.GetReplicaState(name);
|
||||
}
|
||||
|
||||
@ -310,7 +310,6 @@ class Storage {
|
||||
memgraph::replication::ReplicationState repl_state_;
|
||||
ReplicationStorageState repl_storage_state_;
|
||||
|
||||
public:
|
||||
// Main storage lock.
|
||||
// Accessors take a shared lock when starting, so it is possible to block
|
||||
// creation of new accessors by taking a unique lock. This is used when doing
|
||||
|
@ -84,7 +84,7 @@ struct Transaction {
|
||||
return modified_edges_.emplace(gid, modified_edge).second;
|
||||
}
|
||||
|
||||
void RemoveModifiedEdge(const Gid &gid) { modified_edges_.erase(gid); }
|
||||
bool RemoveModifiedEdge(const Gid &gid) { return modified_edges_.erase(gid) > 0U; }
|
||||
|
||||
uint64_t transaction_id;
|
||||
uint64_t start_timestamp;
|
||||
|
@ -460,8 +460,7 @@ auto VertexAccessor::BuildResultWithDisk(edge_store const &in_memory_edges, std:
|
||||
/// TODO: (andi) Maybe this check can be done in build_result without damaging anything else.
|
||||
std::erase_if(ret, [transaction = this->transaction_, view](const EdgeAccessor &edge_acc) {
|
||||
return !edge_acc.IsVisible(view) || !edge_acc.FromVertex().IsVisible(view) ||
|
||||
!edge_acc.ToVertex().IsVisible(view) ||
|
||||
transaction->edges_to_delete_.contains(utils::SerializeIdType(edge_acc.Gid()));
|
||||
!edge_acc.ToVertex().IsVisible(view) || transaction->edges_to_delete_.contains(edge_acc.Gid().ToString());
|
||||
});
|
||||
std::unordered_set<storage::Gid> in_mem_edges_set;
|
||||
in_mem_edges_set.reserve(ret.size());
|
||||
@ -470,7 +469,7 @@ auto VertexAccessor::BuildResultWithDisk(edge_store const &in_memory_edges, std:
|
||||
}
|
||||
|
||||
for (const auto &disk_edge_acc : disk_edges) {
|
||||
auto const edge_gid_str = utils::SerializeIdType(disk_edge_acc.Gid());
|
||||
auto const edge_gid_str = disk_edge_acc.Gid().ToString();
|
||||
if (in_mem_edges_set.contains(disk_edge_acc.Gid()) ||
|
||||
(view == View::NEW && transaction_->edges_to_delete_.contains(edge_gid_str))) {
|
||||
continue;
|
||||
@ -496,7 +495,6 @@ Result<EdgesVertexAccessorResult> VertexAccessor::InEdges(View view, const std::
|
||||
bool edges_modified_in_tx = !vertex_->in_edges.empty();
|
||||
|
||||
disk_edges = disk_storage->InEdges(this, edge_types, destination, transaction_, view);
|
||||
/// DiskStorage & View::OLD
|
||||
if (view == View::OLD && !edges_modified_in_tx) {
|
||||
return EdgesVertexAccessorResult{.edges = disk_edges, .expanded_count = static_cast<int64_t>(disk_edges.size())};
|
||||
}
|
||||
@ -563,7 +561,6 @@ Result<EdgesVertexAccessorResult> VertexAccessor::InEdges(View view, const std::
|
||||
if (!exists) return Error::NONEXISTENT_OBJECT;
|
||||
if (deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
/// DiskStorage & View::NEW
|
||||
if (transaction_->IsDiskStorage()) {
|
||||
return EdgesVertexAccessorResult{.edges = BuildResultWithDisk(in_edges, disk_edges, view, "IN"),
|
||||
.expanded_count = expanded_count};
|
||||
@ -587,7 +584,7 @@ Result<EdgesVertexAccessorResult> VertexAccessor::OutEdges(View view, const std:
|
||||
bool edges_modified_in_tx = !vertex_->out_edges.empty();
|
||||
|
||||
disk_edges = disk_storage->OutEdges(this, edge_types, destination, transaction_, view);
|
||||
/// DiskStorage & View::OLD
|
||||
|
||||
if (view == View::OLD && !edges_modified_in_tx) {
|
||||
return EdgesVertexAccessorResult{.edges = disk_edges, .expanded_count = static_cast<int64_t>(disk_edges.size())};
|
||||
}
|
||||
@ -652,7 +649,6 @@ Result<EdgesVertexAccessorResult> VertexAccessor::OutEdges(View view, const std:
|
||||
if (!exists) return Error::NONEXISTENT_OBJECT;
|
||||
if (deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
/// DiskStorage & View::NEW
|
||||
if (transaction_->IsDiskStorage()) {
|
||||
return EdgesVertexAccessorResult{.edges = BuildResultWithDisk(out_edges, disk_edges, view, "OUT"),
|
||||
.expanded_count = expanded_count};
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <iterator>
|
||||
#include <numeric>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
@ -31,7 +32,6 @@ namespace memgraph::utils {
|
||||
static constexpr const char *outEdgeDirection = "0";
|
||||
static constexpr const char *inEdgeDirection = "1";
|
||||
|
||||
namespace {
|
||||
struct StartEndPositions {
|
||||
size_t start;
|
||||
size_t end;
|
||||
@ -58,28 +58,6 @@ inline std::string_view FindPartOfStringView(const std::string_view str, const c
|
||||
return startEndPos.Valid() ? str.substr(startEndPos.start, startEndPos.Size()) : str;
|
||||
}
|
||||
|
||||
inline std::string_view GetViewOfFirstPartOfSplit(const std::string_view src, const char delimiter) {
|
||||
return FindPartOfStringView(src, delimiter, 1);
|
||||
}
|
||||
|
||||
inline std::string_view GetViewOfSecondPartOfSplit(const std::string_view src, const char delimiter) {
|
||||
return FindPartOfStringView(src, delimiter, 2);
|
||||
}
|
||||
|
||||
inline std::string_view GetViewOfThirdPartOfSplit(const std::string_view src, const char delimiter) {
|
||||
return FindPartOfStringView(src, delimiter, 3);
|
||||
}
|
||||
|
||||
inline std::string_view GetViewOfFourthPartOfSplit(const std::string_view src, const char delimiter) {
|
||||
return FindPartOfStringView(src, delimiter, 4);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
/// TODO: try to move this to hpp files so that we can follow jump on readings
|
||||
|
||||
inline std::string SerializeIdType(const auto &id) { return std::to_string(id.AsUint()); }
|
||||
|
||||
inline bool SerializedVertexHasLabels(const std::string &labels) { return !labels.empty(); }
|
||||
|
||||
template <typename T>
|
||||
@ -92,7 +70,7 @@ inline std::vector<std::string> TransformIDsToString(const TCollection &col) {
|
||||
std::vector<std::string> transformed_col;
|
||||
transformed_col.reserve(col.size());
|
||||
for (const auto &elem : col) {
|
||||
transformed_col.emplace_back(SerializeIdType(elem));
|
||||
transformed_col.emplace_back(elem.ToString());
|
||||
}
|
||||
return transformed_col;
|
||||
}
|
||||
@ -106,29 +84,22 @@ inline std::vector<storage::LabelId> TransformFromStringLabels(std::vector<std::
|
||||
return transformed_labels;
|
||||
}
|
||||
|
||||
/// TODO: (andi) Change to utils::Join call
|
||||
inline std::string SerializeLabels(const std::vector<std::string> &labels) {
|
||||
if (labels.empty()) {
|
||||
return "";
|
||||
}
|
||||
std::string result = labels[0];
|
||||
std::string ser_labels =
|
||||
std::accumulate(std::next(labels.begin()), labels.end(), result,
|
||||
[](const std::string &join, const auto &label_id) { return join + "," + label_id; });
|
||||
return ser_labels;
|
||||
}
|
||||
inline std::string SerializeLabels(const std::vector<std::string> &labels) { return utils::Join(labels, ","); }
|
||||
|
||||
inline std::string SerializeProperties(const storage::PropertyStore &properties) { return properties.StringBuffer(); }
|
||||
|
||||
/// TODO: andi Probably it is better to add delimiter between label,property and the rest of labels
|
||||
/// TODO: reuse PutIndexingLabelAndPropertiesFirst
|
||||
inline std::string PutIndexingLabelAndPropertyFirst(const std::string &indexing_label,
|
||||
const std::string &indexing_property,
|
||||
const std::vector<std::string> &vertex_labels) {
|
||||
std::string result = indexing_label + "," + indexing_property;
|
||||
std::string result;
|
||||
result += indexing_label;
|
||||
result += ",";
|
||||
result += indexing_property;
|
||||
|
||||
for (const auto &label : vertex_labels) {
|
||||
if (label != indexing_label) {
|
||||
result += "," + label;
|
||||
result += ",";
|
||||
result += label;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@ -138,33 +109,43 @@ inline std::string PutIndexingLabelAndPropertiesFirst(const std::string &target_
|
||||
const std::vector<std::string> &target_properties) {
|
||||
std::string result = target_label;
|
||||
for (const auto &target_property : target_properties) {
|
||||
result += "," + target_property;
|
||||
result += ",";
|
||||
result += target_property;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
inline storage::Gid ExtractSrcVertexGidFromEdgeValue(const std::string value) {
|
||||
const std::string_view src_vertex_gid_str = GetViewOfFirstPartOfSplit(value, '|');
|
||||
return storage::Gid::FromString(src_vertex_gid_str);
|
||||
return storage::Gid::FromString(FindPartOfStringView(value, '|', 1));
|
||||
}
|
||||
|
||||
inline storage::Gid ExtractDstVertexGidFromEdgeValue(const std::string value) {
|
||||
const std::string_view dst_vertex_gid_str = GetViewOfSecondPartOfSplit(value, '|');
|
||||
return storage::Gid::FromString(dst_vertex_gid_str);
|
||||
return storage::Gid::FromString(FindPartOfStringView(value, '|', 2));
|
||||
}
|
||||
|
||||
inline storage::EdgeTypeId ExtractEdgeTypeIdFromEdgeValue(const std::string_view value) {
|
||||
const std::string_view edge_type_str = GetViewOfThirdPartOfSplit(value, '|');
|
||||
return storage::EdgeTypeId::FromString(edge_type_str);
|
||||
return storage::EdgeTypeId::FromString(FindPartOfStringView(value, '|', 3));
|
||||
}
|
||||
|
||||
inline std::string_view GetPropertiesFromEdgeValue(const std::string_view value) {
|
||||
return FindPartOfStringView(value, '|', 4);
|
||||
}
|
||||
|
||||
inline std::string SerializeEdgeAsValue(const std::string &src_vertex_gid, const std::string &dst_vertex_gid,
|
||||
const storage::EdgeTypeId &edge_type, const storage::Edge *edge = nullptr) {
|
||||
auto tmp = src_vertex_gid + "|" + dst_vertex_gid + "|" + SerializeIdType(edge_type) + "|";
|
||||
std::string edge_type_str = edge_type.ToString();
|
||||
std::string result;
|
||||
result.reserve(src_vertex_gid.size() + 3 + dst_vertex_gid.size() + edge_type_str.size());
|
||||
result += src_vertex_gid;
|
||||
result += "|";
|
||||
result += dst_vertex_gid;
|
||||
result += "|";
|
||||
result += edge_type_str;
|
||||
result += "|";
|
||||
if (edge) {
|
||||
return tmp + utils::SerializeProperties(edge->properties);
|
||||
return result + utils::SerializeProperties(edge->properties);
|
||||
}
|
||||
return tmp;
|
||||
return result;
|
||||
}
|
||||
|
||||
inline std::string SerializeVertexAsValueForAuxiliaryStorages(storage::LabelId label_to_remove,
|
||||
@ -181,19 +162,15 @@ inline std::string SerializeVertexAsValueForAuxiliaryStorages(storage::LabelId l
|
||||
return result + SerializeProperties(property_store);
|
||||
}
|
||||
|
||||
inline std::string ExtractGidFromKey(const std::string &key) {
|
||||
return std::string(GetViewOfSecondPartOfSplit(key, '|'));
|
||||
}
|
||||
inline std::string_view ExtractGidFromKey(const std::string &key) { return FindPartOfStringView(key, '|', 2); }
|
||||
|
||||
inline storage::PropertyStore DeserializePropertiesFromAuxiliaryStorages(const std::string &value) {
|
||||
const std::string_view properties_str = GetViewOfSecondPartOfSplit(value, '|');
|
||||
return storage::PropertyStore::CreateFromBuffer(properties_str);
|
||||
return storage::PropertyStore::CreateFromBuffer(FindPartOfStringView(value, '|', 2));
|
||||
}
|
||||
|
||||
inline std::string SerializeVertex(const storage::Vertex &vertex) {
|
||||
std::string result = utils::SerializeLabels(TransformIDsToString(vertex.labels)) + "|";
|
||||
result += utils::SerializeIdType(vertex.gid);
|
||||
return result;
|
||||
return result + vertex.gid.ToString();
|
||||
}
|
||||
|
||||
inline std::vector<storage::LabelId> DeserializeLabelsFromMainDiskStorage(const std::string &key) {
|
||||
@ -205,26 +182,36 @@ inline std::vector<storage::LabelId> DeserializeLabelsFromMainDiskStorage(const
|
||||
}
|
||||
|
||||
inline std::vector<std::string> ExtractLabelsFromMainDiskStorage(const std::string &key) {
|
||||
return utils::Split(GetViewOfFirstPartOfSplit(key, '|'), ",");
|
||||
return utils::Split(FindPartOfStringView(key, '|', 1), ",");
|
||||
}
|
||||
|
||||
inline storage::PropertyStore DeserializePropertiesFromMainDiskStorage(const std::string_view value) {
|
||||
return storage::PropertyStore::CreateFromBuffer(value);
|
||||
}
|
||||
|
||||
inline std::string ExtractGidFromMainDiskStorage(const std::string &key) { return ExtractGidFromKey(key); }
|
||||
inline std::string_view ExtractGidFromMainDiskStorage(const std::string &key) { return ExtractGidFromKey(key); }
|
||||
|
||||
inline std::string ExtractGidFromUniqueConstraintStorage(const std::string &key) { return ExtractGidFromKey(key); }
|
||||
inline std::string_view ExtractGidFromUniqueConstraintStorage(const std::string &key) { return ExtractGidFromKey(key); }
|
||||
|
||||
inline std::string GetKeyForUniqueConstraintsDurability(storage::LabelId label,
|
||||
const std::set<storage::PropertyId> &properties) {
|
||||
std::string entry;
|
||||
entry += label.ToString();
|
||||
for (auto property : properties) {
|
||||
entry += ",";
|
||||
entry += property.ToString();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
/// Serialize vertex to string as a key in unique constraint index KV store.
|
||||
/// target_label, target_property_1, target_property_2, ... GID |
|
||||
/// commit_timestamp
|
||||
inline std::string SerializeVertexAsKeyForUniqueConstraint(const storage::LabelId &constraint_label,
|
||||
const std::set<storage::PropertyId> &constraint_properties,
|
||||
const std::string &gid) {
|
||||
auto key_for_indexing = PutIndexingLabelAndPropertiesFirst(SerializeIdType(constraint_label),
|
||||
TransformIDsToString(constraint_properties));
|
||||
return key_for_indexing + "|" + gid;
|
||||
std::string_view gid) {
|
||||
auto key_for_indexing =
|
||||
PutIndexingLabelAndPropertiesFirst(constraint_label.ToString(), TransformIDsToString(constraint_properties));
|
||||
key_for_indexing += "|";
|
||||
key_for_indexing += gid;
|
||||
return key_for_indexing;
|
||||
}
|
||||
|
||||
inline std::string SerializeVertexAsValueForUniqueConstraint(const storage::LabelId &constraint_label,
|
||||
@ -234,31 +221,29 @@ inline std::string SerializeVertexAsValueForUniqueConstraint(const storage::Labe
|
||||
}
|
||||
|
||||
inline storage::LabelId DeserializeConstraintLabelFromUniqueConstraintStorage(const std::string &key) {
|
||||
const std::string_view firstPartKey = GetViewOfFirstPartOfSplit(key, '|');
|
||||
const std::string_view constraint_key = GetViewOfFirstPartOfSplit(firstPartKey, ',');
|
||||
/// TODO: andi Change this to deserialization method directly into the LabelId class
|
||||
uint64_t labelID = 0;
|
||||
const char *endOfConstraintKey = constraint_key.data() + constraint_key.size();
|
||||
auto [ptr, ec] = std::from_chars(constraint_key.data(), endOfConstraintKey, labelID);
|
||||
if (ec != std::errc() || ptr != endOfConstraintKey) {
|
||||
throw std::invalid_argument("Failed to deserialize label id from unique constraint storage");
|
||||
}
|
||||
return storage::LabelId::FromUint(labelID);
|
||||
const std::string_view firstPartKey = FindPartOfStringView(key, '|', 1);
|
||||
const std::string_view constraint_key = FindPartOfStringView(firstPartKey, ',', 1);
|
||||
return storage::LabelId::FromString(constraint_key);
|
||||
}
|
||||
|
||||
inline storage::PropertyStore DeserializePropertiesFromUniqueConstraintStorage(const std::string &value) {
|
||||
return DeserializePropertiesFromAuxiliaryStorages(value);
|
||||
}
|
||||
|
||||
inline std::string SerializeVertexAsKeyForLabelIndex(const std::string &indexing_label, const std::string &gid) {
|
||||
return indexing_label + "|" + gid;
|
||||
inline std::string SerializeVertexAsKeyForLabelIndex(const std::string &indexing_label, std::string_view gid) {
|
||||
std::string result;
|
||||
result.reserve(indexing_label.size() + 1 + gid.size());
|
||||
result += indexing_label;
|
||||
result += "|";
|
||||
result += gid;
|
||||
return result;
|
||||
}
|
||||
|
||||
inline std::string SerializeVertexAsKeyForLabelIndex(storage::LabelId label, storage::Gid gid) {
|
||||
return SerializeVertexAsKeyForLabelIndex(SerializeIdType(label), utils::SerializeIdType(gid));
|
||||
return SerializeVertexAsKeyForLabelIndex(label.ToString(), gid.ToString());
|
||||
}
|
||||
|
||||
inline std::string ExtractGidFromLabelIndexStorage(const std::string &key) { return ExtractGidFromKey(key); }
|
||||
inline std::string_view ExtractGidFromLabelIndexStorage(const std::string &key) { return ExtractGidFromKey(key); }
|
||||
|
||||
inline std::string SerializeVertexAsValueForLabelIndex(storage::LabelId indexing_label,
|
||||
const std::vector<storage::LabelId> &vertex_labels,
|
||||
@ -268,7 +253,7 @@ inline std::string SerializeVertexAsValueForLabelIndex(storage::LabelId indexing
|
||||
|
||||
inline std::vector<storage::LabelId> DeserializeLabelsFromIndexStorage(const std::string &key,
|
||||
const std::string &value) {
|
||||
std::string labels_str{GetViewOfFirstPartOfSplit(value, '|')};
|
||||
std::string labels_str{FindPartOfStringView(value, '|', 1)};
|
||||
std::vector<storage::LabelId> labels{TransformFromStringLabels(utils::Split(labels_str, ","))};
|
||||
std::string indexing_label = key.substr(0, key.find('|'));
|
||||
labels.emplace_back(storage::LabelId::FromString(indexing_label));
|
||||
@ -286,14 +271,20 @@ inline storage::PropertyStore DeserializePropertiesFromLabelIndexStorage(const s
|
||||
|
||||
inline std::string SerializeVertexAsKeyForLabelPropertyIndex(const std::string &indexing_label,
|
||||
const std::string &indexing_property,
|
||||
const std::string &gid) {
|
||||
return indexing_label + "|" + indexing_property + "|" + gid;
|
||||
std::string_view gid) {
|
||||
std::string result;
|
||||
result.reserve(indexing_label.size() + 2 + indexing_property.size() + gid.size());
|
||||
result += indexing_label;
|
||||
result += "|";
|
||||
result += indexing_property;
|
||||
result += "|";
|
||||
result += gid;
|
||||
return result;
|
||||
}
|
||||
|
||||
inline std::string SerializeVertexAsKeyForLabelPropertyIndex(storage::LabelId label, storage::PropertyId property,
|
||||
storage::Gid gid) {
|
||||
return SerializeVertexAsKeyForLabelPropertyIndex(SerializeIdType(label), SerializeIdType(property),
|
||||
utils::SerializeIdType(gid));
|
||||
return SerializeVertexAsKeyForLabelPropertyIndex(label.ToString(), property.ToString(), gid.ToString());
|
||||
}
|
||||
|
||||
inline std::string SerializeVertexAsValueForLabelPropertyIndex(storage::LabelId indexing_label,
|
||||
@ -303,7 +294,7 @@ inline std::string SerializeVertexAsValueForLabelPropertyIndex(storage::LabelId
|
||||
}
|
||||
|
||||
inline std::string ExtractGidFromLabelPropertyIndexStorage(const std::string &key) {
|
||||
return std::string(GetViewOfThirdPartOfSplit(key, '|'));
|
||||
return std::string(FindPartOfStringView(key, '|', 3));
|
||||
}
|
||||
|
||||
inline std::vector<storage::LabelId> DeserializeLabelsFromLabelPropertyIndexStorage(const std::string &key,
|
||||
|
@ -100,10 +100,3 @@ workloads:
|
||||
proc: "tests/e2e/mock_api/procedures/"
|
||||
args: ["mock_api/test_compare_mock.py"]
|
||||
<<: *compare_mock_in_memory_cluster
|
||||
|
||||
# Disk storage doesn't work with compare mock
|
||||
# - name: "test-compare-mock on disk"
|
||||
# binary: "tests/e2e/pytest_runner.sh"
|
||||
# proc: "tests/e2e/mock_api/procedures/"
|
||||
# args: ["mock_api/test_compare_mock.py"]
|
||||
# <<: *compare_mock_disk_cluster
|
||||
|
@ -81,26 +81,3 @@ workloads:
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
proc: "tests/e2e/triggers/procedures/"
|
||||
<<: *storage_properties_edges_true_disk_cluster
|
||||
|
||||
# - name: "ON UPDATE Triggers for disk storage"
|
||||
# binary: "tests/e2e/triggers/memgraph__e2e__triggers__on_update"
|
||||
# args: ["--bolt-port", *bolt_port]
|
||||
# proc: "tests/e2e/triggers/procedures/"
|
||||
# <<: *storage_properties_edges_true_disk_cluster
|
||||
|
||||
# - name: "ON DELETE Triggers Storage Properties On Edges True for disk storage"
|
||||
# binary: "tests/e2e/triggers/memgraph__e2e__triggers__on_delete"
|
||||
# args: ["--bolt-port", *bolt_port]
|
||||
# proc: "tests/e2e/triggers/procedures/"
|
||||
# <<: *storage_properties_edges_true_disk_cluster
|
||||
|
||||
# - name: "Triggers privilege check for disk storage"
|
||||
# binary: "tests/e2e/triggers/memgraph__e2e__triggers__privileges"
|
||||
# args: ["--bolt-port", *bolt_port]
|
||||
# <<: *storage_properties_edges_true_disk_cluster
|
||||
|
||||
# - name: "ON DELETE Triggers Storage Properties On Edges False for disk storage"
|
||||
# binary: "tests/e2e/pytest_runner.sh"
|
||||
# proc: "tests/e2e/triggers/procedures/"
|
||||
# args: ["triggers/triggers_properties_false.py"]
|
||||
# <<: *storage_properties_edges_false_disk_cluster
|
||||
|
@ -33,18 +33,3 @@ workloads:
|
||||
proc: "tests/e2e/write_procedures/procedures/"
|
||||
args: ["write_procedures/read_subgraph.py"]
|
||||
<<: *in_memory_cluster
|
||||
|
||||
# TODO: has to be addressed.
|
||||
# - name: "Write procedures simple on disk"
|
||||
# binary: "tests/e2e/pytest_runner.sh"
|
||||
# proc: "tests/e2e/write_procedures/procedures/"
|
||||
# args: ["write_procedures/simple_write.py"]
|
||||
# <<: *disk_cluster
|
||||
|
||||
# TODO: Has to be addressed but currently some problem with disk storage and edges.
|
||||
# Edge case and requires refactoring of bulk detach delete.
|
||||
# - name: "Graph projection procedures on disk"
|
||||
# binary: "tests/e2e/pytest_runner.sh"
|
||||
# proc: "tests/e2e/write_procedures/procedures/"
|
||||
# args: ["write_procedures/read_subgraph.py"]
|
||||
# <<: *disk_cluster
|
||||
|
@ -60,7 +60,7 @@ TEST_F(RocksDBStorageTest, SerializeVertexGID) {
|
||||
auto acc = storage->Access();
|
||||
auto vertex = acc->CreateVertex();
|
||||
auto gid = vertex.Gid();
|
||||
ASSERT_EQ(memgraph::utils::SerializeVertex(*vertex.vertex_), "|" + memgraph::utils::SerializeIdType(gid));
|
||||
ASSERT_EQ(memgraph::utils::SerializeVertex(*vertex.vertex_), "|" + gid.ToString());
|
||||
}
|
||||
|
||||
TEST_F(RocksDBStorageTest, SerializeVertexGIDLabels) {
|
||||
@ -71,9 +71,8 @@ TEST_F(RocksDBStorageTest, SerializeVertexGIDLabels) {
|
||||
ASSERT_FALSE(vertex.AddLabel(ser_player_label).HasError());
|
||||
ASSERT_FALSE(vertex.AddLabel(ser_user_label).HasError());
|
||||
auto gid = vertex.Gid();
|
||||
ASSERT_EQ(memgraph::utils::SerializeVertex(*vertex.vertex_), std::to_string(ser_player_label.AsInt()) + "," +
|
||||
std::to_string(ser_user_label.AsInt()) + "|" +
|
||||
memgraph::utils::SerializeIdType(gid));
|
||||
ASSERT_EQ(memgraph::utils::SerializeVertex(*vertex.vertex_),
|
||||
ser_player_label.ToString() + "," + ser_user_label.ToString() + "|" + gid.ToString());
|
||||
}
|
||||
|
||||
TEST_F(RocksDBStorageTest, SerializePropertiesLocalBuffer) {
|
||||
|
@ -765,8 +765,7 @@ TYPED_TEST(ConstraintsTest, UniqueConstraintsLabelAlteration) {
|
||||
gid1 = vertex1.Gid();
|
||||
gid2 = vertex2.Gid();
|
||||
|
||||
spdlog::debug("Vertex1 gid: {} Vertex2 gid: {}\n", memgraph::utils::SerializeIdType(gid1),
|
||||
memgraph::utils::SerializeIdType(gid2));
|
||||
spdlog::debug("Vertex1 gid: {} Vertex2 gid: {}\n", gid1.ToString(), gid2.ToString());
|
||||
|
||||
ASSERT_NO_ERROR(vertex1.AddLabel(this->label2));
|
||||
ASSERT_NO_ERROR(vertex1.SetProperty(this->prop1, PropertyValue(1)));
|
||||
|
Loading…
Reference in New Issue
Block a user