Compare commits
6 Commits
master
...
property-s
Author | SHA1 | Date | |
---|---|---|---|
|
634b46a57a | ||
|
0ec44b6791 | ||
|
39e1776a74 | ||
|
0215edb4bf | ||
|
514386d616 | ||
|
826647c876 |
@ -36,6 +36,19 @@ KVStore::KVStore(std::filesystem::path storage) : pimpl_(std::make_unique<impl>(
|
||||
pimpl_->db.reset(db);
|
||||
}
|
||||
|
||||
KVStore::KVStore(std::filesystem::path storage, rocksdb::Options db_options) : pimpl_(std::make_unique<impl>()) {
|
||||
pimpl_->storage = storage;
|
||||
pimpl_->options = std::move(db_options);
|
||||
if (!utils::EnsureDir(pimpl_->storage))
|
||||
throw KVStoreError("Folder for the key-value store " + pimpl_->storage.string() + " couldn't be initialized!");
|
||||
rocksdb::DB *db = nullptr;
|
||||
auto s = rocksdb::DB::Open(pimpl_->options, storage.c_str(), &db);
|
||||
if (!s.ok())
|
||||
throw KVStoreError("RocksDB couldn't be initialized inside " + storage.string() + " -- " +
|
||||
std::string(s.ToString()));
|
||||
pimpl_->db.reset(db);
|
||||
}
|
||||
|
||||
KVStore::~KVStore() {
|
||||
if (pimpl_ == nullptr) return;
|
||||
spdlog::debug("Destroying KVStore at {}", pimpl_->storage.string());
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <rocksdb/options.h>
|
||||
#include <filesystem>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
@ -43,6 +44,7 @@ class KVStore final {
|
||||
* storage directory because that will lead to undefined behaviour.
|
||||
*/
|
||||
explicit KVStore(std::filesystem::path storage);
|
||||
explicit KVStore(std::filesystem::path storage, rocksdb::Options db_options);
|
||||
|
||||
KVStore(const KVStore &other) = delete;
|
||||
KVStore(KVStore &&other);
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include "storage/v2/durability/durability.hpp"
|
||||
#include "system/system.hpp"
|
||||
#include "telemetry/telemetry.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
#include "utils/signals.hpp"
|
||||
#include "utils/sysinfo/memory.hpp"
|
||||
#include "utils/system_info.hpp"
|
||||
@ -145,6 +146,8 @@ int main(int argc, char **argv) {
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&memgraph::utils::TerminateHandler);
|
||||
|
||||
memgraph::utils::OnScopeExit deinit_pds([]() { memgraph::storage::PDS::Deinit(); });
|
||||
|
||||
// Initialize Python
|
||||
auto *program_name = Py_DecodeLocale(argv[0], nullptr);
|
||||
MG_ASSERT(program_name);
|
||||
|
@ -68,7 +68,7 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::stri
|
||||
parser = std::make_unique<frontend::opencypher::Parser>(stripped_query.query());
|
||||
} catch (const SyntaxException &e) {
|
||||
// There is a syntax exception in the stripped query. Re-run the parser
|
||||
// on the original query to get an appropriate error messsage.
|
||||
// on the original query to get an appropriate error message.
|
||||
parser = std::make_unique<frontend::opencypher::Parser>(query_string);
|
||||
|
||||
// If an exception was not thrown here, the stripper messed something
|
||||
|
@ -298,6 +298,7 @@ class Interpreter final {
|
||||
query_executions_.clear();
|
||||
system_transaction_.reset();
|
||||
transaction_queries_->clear();
|
||||
current_timeout_timer_.reset();
|
||||
if (current_db_.db_acc_ && current_db_.db_acc_->is_deleting()) {
|
||||
current_db_.db_acc_.reset();
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ add_library(mg-storage-v2 STATIC
|
||||
replication/rpc.cpp
|
||||
replication/replication_storage_state.cpp
|
||||
inmemory/replication/recovery.cpp
|
||||
property_disk_store.cpp
|
||||
)
|
||||
|
||||
target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk mg-events mg-memory)
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -58,7 +58,7 @@ void ExistenceConstraints::LoadExistenceConstraints(const std::vector<std::strin
|
||||
|
||||
[[nodiscard]] std::optional<ConstraintViolation> ExistenceConstraints::ValidateVertexOnConstraint(
|
||||
const Vertex &vertex, const LabelId &label, const PropertyId &property) {
|
||||
if (!vertex.deleted && utils::Contains(vertex.labels, label) && !vertex.properties.HasProperty(property)) {
|
||||
if (!vertex.deleted && utils::Contains(vertex.labels, label) && !vertex.HasProperty(property)) {
|
||||
return ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label, std::set<PropertyId>{property}};
|
||||
}
|
||||
return std::nullopt;
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -90,12 +90,13 @@ bool DiskLabelIndex::SyncVertexToLabelIndexStorage(const Vertex &vertex, uint64_
|
||||
if (!utils::Contains(vertex.labels, index_label)) {
|
||||
continue;
|
||||
}
|
||||
if (!disk_transaction
|
||||
->Put(utils::SerializeVertexAsKeyForLabelIndex(index_label, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelIndex(index_label, vertex.labels, vertex.properties))
|
||||
.ok()) {
|
||||
return false;
|
||||
}
|
||||
// TODO: Re-enable
|
||||
// if (!disk_transaction
|
||||
// ->Put(utils::SerializeVertexAsKeyForLabelIndex(index_label, vertex.gid),
|
||||
// utils::SerializeVertexAsValueForLabelIndex(index_label, vertex.labels, vertex.properties))
|
||||
// .ok()) {
|
||||
// return false;
|
||||
// }
|
||||
}
|
||||
|
||||
return CommitWithTimestamp(disk_transaction.get(), commit_timestamp);
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -20,7 +20,9 @@ namespace memgraph::storage {
|
||||
namespace {
|
||||
|
||||
bool IsVertexIndexedByLabelProperty(const Vertex &vertex, LabelId label, PropertyId property) {
|
||||
return utils::Contains(vertex.labels, label) && vertex.properties.HasProperty(property);
|
||||
// TODO: Re-enable
|
||||
// return utils::Contains(vertex.labels, label) && vertex.properties.HasProperty(property);
|
||||
return false;
|
||||
}
|
||||
|
||||
[[nodiscard]] bool ClearTransactionEntriesWithRemovedIndexingLabel(
|
||||
@ -94,12 +96,14 @@ bool DiskLabelPropertyIndex::SyncVertexToLabelPropertyIndexStorage(const Vertex
|
||||
}
|
||||
for (const auto &[index_label, index_property] : index_) {
|
||||
if (IsVertexIndexedByLabelProperty(vertex, index_label, index_property)) {
|
||||
if (!disk_transaction
|
||||
->Put(utils::SerializeVertexAsKeyForLabelPropertyIndex(index_label, index_property, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelPropertyIndex(index_label, vertex.labels, vertex.properties))
|
||||
.ok()) {
|
||||
return false;
|
||||
}
|
||||
// TODO: Re-enable
|
||||
// if (!disk_transaction
|
||||
// ->Put(utils::SerializeVertexAsKeyForLabelPropertyIndex(index_label, index_property, vertex.gid),
|
||||
// utils::SerializeVertexAsValueForLabelPropertyIndex(index_label, vertex.labels,
|
||||
// vertex.properties))
|
||||
// .ok()) {
|
||||
// return false;
|
||||
// }
|
||||
}
|
||||
}
|
||||
return CommitWithTimestamp(disk_transaction.get(), commit_timestamp);
|
||||
|
@ -172,7 +172,7 @@ bool VertexHasLabel(const Vertex &vertex, LabelId label, Transaction *transactio
|
||||
|
||||
PropertyValue GetVertexProperty(const Vertex &vertex, PropertyId property, Transaction *transaction, View view) {
|
||||
bool deleted = vertex.deleted;
|
||||
PropertyValue value = vertex.properties.GetProperty(property);
|
||||
PropertyValue value = vertex.GetProperty(property);
|
||||
Delta *delta = vertex.delta;
|
||||
ApplyDeltasForRead(transaction, delta, view, [&deleted, &value, property](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
@ -628,10 +628,11 @@ std::unordered_set<Gid> DiskStorage::MergeVerticesFromMainCacheWithLabelIndexCac
|
||||
spdlog::trace("Loaded vertex with gid: {} from main index storage to label index", vertex.gid.ToString());
|
||||
uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
|
||||
/// TODO: here are doing serialization and then later deserialization again -> expensive
|
||||
LoadVertexToLabelIndexCache(transaction, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties),
|
||||
CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts),
|
||||
indexed_vertices->access());
|
||||
// TODO: Re-enable
|
||||
// LoadVertexToLabelIndexCache(transaction, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid),
|
||||
// utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels,
|
||||
// vertex.properties), CreateDeleteDeserializedIndexObjectDelta(index_deltas,
|
||||
// std::nullopt, ts), indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
return gids;
|
||||
@ -678,10 +679,11 @@ std::unordered_set<Gid> DiskStorage::MergeVerticesFromMainCacheWithLabelProperty
|
||||
gids.insert(vertex.gid);
|
||||
if (label_property_filter(vertex, label, property, view)) {
|
||||
uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
|
||||
LoadVertexToLabelPropertyIndexCache(
|
||||
transaction, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
||||
CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts), indexed_vertices->access());
|
||||
// TODO: Re-enable
|
||||
// LoadVertexToLabelPropertyIndexCache(
|
||||
// transaction, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
||||
// utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
||||
// CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts), indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
|
||||
@ -763,10 +765,11 @@ std::unordered_set<Gid> DiskStorage::MergeVerticesFromMainCacheWithLabelProperty
|
||||
if (VertexHasLabel(vertex, label, transaction, view) &&
|
||||
IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) {
|
||||
uint64_t ts = utils::GetEarliestTimestamp(vertex.delta);
|
||||
LoadVertexToLabelPropertyIndexCache(
|
||||
transaction, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
||||
utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
||||
CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts), indexed_vertices->access());
|
||||
// TODO: Re-enable
|
||||
// LoadVertexToLabelPropertyIndexCache(
|
||||
// transaction, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid),
|
||||
// utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties),
|
||||
// CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts), indexed_vertices->access());
|
||||
}
|
||||
}
|
||||
return gids;
|
||||
@ -1028,14 +1031,15 @@ bool DiskStorage::WriteVertexToVertexColumnFamily(Transaction *transaction, cons
|
||||
MG_ASSERT(transaction->commit_timestamp, "Writing vertex to disk but commit timestamp not set.");
|
||||
auto commit_ts = transaction->commit_timestamp->load(std::memory_order_relaxed);
|
||||
const auto ser_vertex = utils::SerializeVertex(vertex);
|
||||
auto status = transaction->disk_transaction_->Put(kvstore_->vertex_chandle, ser_vertex,
|
||||
utils::SerializeProperties(vertex.properties));
|
||||
if (status.ok()) {
|
||||
spdlog::trace("rocksdb: Saved vertex with key {} and ts {} to vertex column family", ser_vertex, commit_ts);
|
||||
return true;
|
||||
}
|
||||
spdlog::error("rocksdb: Failed to save vertex with key {} and ts {} to vertex column family", ser_vertex, commit_ts);
|
||||
return false;
|
||||
// TODO: Re-enable
|
||||
// auto status = transaction->disk_transaction_->Put(kvstore_->vertex_chandle, ser_vertex,
|
||||
// utils::SerializeProperties(vertex.properties));
|
||||
// if (status.ok()) {
|
||||
// spdlog::trace("rocksdb: Saved vertex with key {} and ts {} to vertex column family", ser_vertex, commit_ts);
|
||||
return true;
|
||||
// }
|
||||
// spdlog::error("rocksdb: Failed to save vertex with key {} and ts {} to vertex column family", ser_vertex,
|
||||
// commit_ts); return false;
|
||||
}
|
||||
|
||||
bool DiskStorage::WriteEdgeToEdgeColumnFamily(Transaction *transaction, const std::string &serialized_edge_key,
|
||||
@ -1358,7 +1362,8 @@ VertexAccessor DiskStorage::CreateVertexFromDisk(Transaction *transaction, utils
|
||||
MG_ASSERT(inserted, "The vertex must be inserted here!");
|
||||
MG_ASSERT(it != accessor.end(), "Invalid Vertex accessor!");
|
||||
it->labels = std::move(label_ids);
|
||||
it->properties = std::move(properties);
|
||||
// TODO: Re-enable
|
||||
// it->properties = std::move(properties);
|
||||
delta->prev.Set(&*it);
|
||||
return {&*it, this, transaction};
|
||||
}
|
||||
@ -1419,7 +1424,8 @@ std::optional<EdgeAccessor> DiskStorage::CreateEdgeFromDisk(const VertexAccessor
|
||||
MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
|
||||
edge = EdgeRef(&*it);
|
||||
delta->prev.Set(&*it);
|
||||
edge.ptr->properties.SetBuffer(properties);
|
||||
// TODO Re-enable
|
||||
// edge.ptr->properties.SetBuffer(properties);
|
||||
}
|
||||
|
||||
ModifiedEdgeInfo modified_edge(Delta::Action::DELETE_DESERIALIZED_OBJECT, from_vertex->gid, to_vertex->gid, edge_type,
|
||||
|
@ -28,7 +28,7 @@ namespace {
|
||||
|
||||
bool IsVertexUnderConstraint(const Vertex &vertex, const LabelId &constraint_label,
|
||||
const std::set<PropertyId> &constraint_properties) {
|
||||
return utils::Contains(vertex.labels, constraint_label) && vertex.properties.HasAllProperties(constraint_properties);
|
||||
return utils::Contains(vertex.labels, constraint_label) && vertex.HasAllProperties(constraint_properties);
|
||||
}
|
||||
|
||||
bool IsDifferentVertexWithSameConstraintLabel(const std::string &key, const Gid gid, const LabelId constraint_label) {
|
||||
@ -105,7 +105,7 @@ std::optional<ConstraintViolation> DiskUniqueConstraints::Validate(
|
||||
std::optional<ConstraintViolation> DiskUniqueConstraints::TestIfVertexSatisifiesUniqueConstraint(
|
||||
const Vertex &vertex, std::vector<std::vector<PropertyValue>> &unique_storage, const LabelId &constraint_label,
|
||||
const std::set<PropertyId> &constraint_properties) const {
|
||||
auto property_values = vertex.properties.ExtractPropertyValues(constraint_properties);
|
||||
auto property_values = vertex.ExtractPropertyValues(constraint_properties);
|
||||
|
||||
/// TODO: better naming. Is vertex unique
|
||||
if (property_values.has_value() &&
|
||||
@ -227,10 +227,11 @@ bool DiskUniqueConstraints::SyncVertexToUniqueConstraintsStorage(const Vertex &v
|
||||
if (IsVertexUnderConstraint(vertex, constraint_label, constraint_properties)) {
|
||||
auto key = utils::SerializeVertexAsKeyForUniqueConstraint(constraint_label, constraint_properties,
|
||||
vertex.gid.ToString());
|
||||
auto value = utils::SerializeVertexAsValueForUniqueConstraint(constraint_label, vertex.labels, vertex.properties);
|
||||
if (!disk_transaction->Put(key, value).ok()) {
|
||||
return false;
|
||||
}
|
||||
// TODO: Re-enable
|
||||
// auto value = utils::SerializeVertexAsValueForUniqueConstraint(constraint_label, vertex.labels,
|
||||
// vertex.properties); if (!disk_transaction->Put(key, value).ok()) {
|
||||
// return false;
|
||||
// }
|
||||
}
|
||||
}
|
||||
/// TODO: extract and better message
|
||||
|
@ -277,7 +277,6 @@ void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Couldn't read the size of edge properties!");
|
||||
auto &props = it->properties;
|
||||
read_properties.clear();
|
||||
read_properties.reserve(*props_size);
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
@ -287,7 +286,7 @@ void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &
|
||||
if (!value) throw RecoveryFailure("Couldn't read edge property value!");
|
||||
read_properties.emplace_back(get_property_from_id(*key), std::move(*value));
|
||||
}
|
||||
props.InitProperties(std::move(read_properties));
|
||||
it->InitProperties(std::move(read_properties));
|
||||
}
|
||||
} else {
|
||||
spdlog::debug("Ensuring edge {} doesn't have any properties.", *gid);
|
||||
@ -370,7 +369,6 @@ uint64_t LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Couldn't read size of vertex properties!");
|
||||
auto &props = it->properties;
|
||||
read_properties.clear();
|
||||
read_properties.reserve(*props_size);
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
@ -380,7 +378,7 @@ uint64_t LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<
|
||||
if (!value) throw RecoveryFailure("Couldn't read vertex property value!");
|
||||
read_properties.emplace_back(get_property_from_id(*key), std::move(*value));
|
||||
}
|
||||
props.InitProperties(std::move(read_properties));
|
||||
it->InitProperties(std::move(read_properties));
|
||||
}
|
||||
|
||||
// Skip in edges.
|
||||
@ -720,7 +718,6 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Couldn't read the size of properties!");
|
||||
auto &props = it->properties;
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
auto key = snapshot.ReadUint();
|
||||
if (!key) throw RecoveryFailure("Couldn't read edge property id!");
|
||||
@ -728,7 +725,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
if (!value) throw RecoveryFailure("Couldn't read edge property value!");
|
||||
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for edge {}.",
|
||||
name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||
props.SetProperty(get_property_from_id(*key), *value);
|
||||
it->SetProperty(get_property_from_id(*key), *value);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -796,7 +793,6 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Couldn't read the size of properties!");
|
||||
auto &props = it->properties;
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
auto key = snapshot.ReadUint();
|
||||
if (!key) throw RecoveryFailure("Couldn't read the vertex property id!");
|
||||
@ -804,7 +800,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
if (!value) throw RecoveryFailure("Couldn't read the vertex property value!");
|
||||
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for vertex {}.",
|
||||
name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||
props.SetProperty(get_property_from_id(*key), *value);
|
||||
it->SetProperty(get_property_from_id(*key), *value);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -597,7 +597,7 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, SalientConf
|
||||
// TODO (mferencevic): Mitigate the memory allocation introduced here
|
||||
// (with the `GetProperty` call). It is the only memory allocation in the
|
||||
// entire WAL file writing logic.
|
||||
encoder->WritePropertyValue(vertex.properties.GetProperty(delta.property.key));
|
||||
encoder->WritePropertyValue(vertex.GetProperty(delta.property.key));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
@ -646,7 +646,7 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, const Delta
|
||||
// TODO (mferencevic): Mitigate the memory allocation introduced here
|
||||
// (with the `GetProperty` call). It is the only memory allocation in the
|
||||
// entire WAL file writing logic.
|
||||
encoder->WritePropertyValue(edge.properties.GetProperty(delta.property.key));
|
||||
encoder->WritePropertyValue(edge.GetProperty(delta.property.key));
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
@ -842,7 +842,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper->NameToId(delta.vertex_edge_set_property.property));
|
||||
auto &property_value = delta.vertex_edge_set_property.value;
|
||||
|
||||
vertex->properties.SetProperty(property_id, property_value);
|
||||
vertex->SetProperty(property_id, property_value);
|
||||
|
||||
break;
|
||||
}
|
||||
@ -926,7 +926,7 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst
|
||||
if (edge == edge_acc.end()) throw RecoveryFailure("The edge doesn't exist!");
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper->NameToId(delta.vertex_edge_set_property.property));
|
||||
auto &property_value = delta.vertex_edge_set_property.value;
|
||||
edge->properties.SetProperty(property_id, property_value);
|
||||
edge->SetProperty(property_id, property_value);
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::TRANSACTION_END:
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -19,6 +19,10 @@
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/rw_spin_lock.hpp"
|
||||
|
||||
#include "storage/v2/property_disk_store.hpp"
|
||||
|
||||
// #include "storage/v2/property_disk_store.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
struct Vertex;
|
||||
@ -30,16 +34,120 @@ struct Edge {
|
||||
"Edge must be created with an initial DELETE_OBJECT delta!");
|
||||
}
|
||||
|
||||
~Edge() {
|
||||
// TODO: Don't want to do this here
|
||||
if (!moved) ClearProperties();
|
||||
}
|
||||
|
||||
Edge(Edge &) = delete;
|
||||
Edge &operator=(Edge &) = delete;
|
||||
Edge(Edge &&) = default;
|
||||
Edge &operator=(Edge &&) = delete;
|
||||
|
||||
Gid gid;
|
||||
|
||||
PropertyStore properties;
|
||||
// PropertyStore properties;
|
||||
|
||||
mutable utils::RWSpinLock lock;
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
// uint16_t PAD;
|
||||
|
||||
bool has_prop;
|
||||
|
||||
class HotFixMove {
|
||||
public:
|
||||
HotFixMove() {}
|
||||
HotFixMove(HotFixMove &&other) noexcept {
|
||||
if (this != &other) {
|
||||
// We want only the latest object to be marked as not-moved; while all previous should be marked as moved
|
||||
moved = false;
|
||||
other.moved = true;
|
||||
}
|
||||
}
|
||||
HotFixMove(HotFixMove &) = delete;
|
||||
HotFixMove &operator=(HotFixMove &) = delete;
|
||||
HotFixMove &operator=(HotFixMove &&) = delete;
|
||||
|
||||
operator bool() const { return moved; }
|
||||
|
||||
private:
|
||||
bool moved{false};
|
||||
} moved;
|
||||
|
||||
Delta *delta;
|
||||
|
||||
Gid HotFixForGID() const { return Gid::FromUint(gid.AsUint() + (1 << 31)); }
|
||||
|
||||
PropertyValue GetProperty(PropertyId property) const {
|
||||
if (!has_prop) return {};
|
||||
const auto prop = PDS::get()->Get(HotFixForGID(), property);
|
||||
if (prop) return *prop;
|
||||
return {};
|
||||
}
|
||||
|
||||
bool SetProperty(PropertyId property, const PropertyValue &value) {
|
||||
if (!has_prop) return {};
|
||||
return PDS::get()->Set(HotFixForGID(), property, value);
|
||||
}
|
||||
|
||||
template <typename TContainer>
|
||||
bool InitProperties(const TContainer &properties) {
|
||||
auto *pds = PDS::get();
|
||||
for (const auto &[property, value] : properties) {
|
||||
if (value.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
if (!pds->Set(HotFixForGID(), property, value)) {
|
||||
return false;
|
||||
}
|
||||
has_prop = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void ClearProperties() {
|
||||
if (!has_prop) return;
|
||||
has_prop = false;
|
||||
auto *pds = PDS::get();
|
||||
pds->Clear(HotFixForGID());
|
||||
}
|
||||
|
||||
std::map<PropertyId, PropertyValue> Properties() {
|
||||
if (!has_prop) return {};
|
||||
return PDS::get()->Get(HotFixForGID());
|
||||
}
|
||||
|
||||
std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>> UpdateProperties(
|
||||
std::map<PropertyId, PropertyValue> &properties) {
|
||||
if (!has_prop && properties.empty()) return {};
|
||||
auto old_properties = Properties();
|
||||
ClearProperties();
|
||||
|
||||
std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>> id_old_new_change;
|
||||
id_old_new_change.reserve(properties.size() + old_properties.size());
|
||||
for (const auto &[prop_id, new_value] : properties) {
|
||||
if (!old_properties.contains(prop_id)) {
|
||||
id_old_new_change.emplace_back(prop_id, PropertyValue(), new_value);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto &[old_key, old_value] : old_properties) {
|
||||
auto [it, inserted] = properties.emplace(old_key, old_value);
|
||||
if (!inserted) {
|
||||
auto &new_value = it->second;
|
||||
id_old_new_change.emplace_back(it->first, old_value, new_value);
|
||||
}
|
||||
}
|
||||
|
||||
MG_ASSERT(InitProperties(properties));
|
||||
return id_old_new_change;
|
||||
}
|
||||
|
||||
uint64_t PropertySize(PropertyId property) const {
|
||||
if (!has_prop) return {};
|
||||
return PDS::get()->GetSize(HotFixForGID(), property);
|
||||
}
|
||||
};
|
||||
|
||||
static_assert(alignof(Edge) >= 8, "The Edge should be aligned to at least 8!");
|
||||
|
@ -128,11 +128,11 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
|
||||
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
|
||||
using ReturnType = decltype(edge_.ptr->properties.GetProperty(property));
|
||||
using ReturnType = decltype(edge_.ptr->GetProperty(property));
|
||||
std::optional<ReturnType> current_value;
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[¤t_value, &property, &value, transaction = transaction_, edge = edge_]() {
|
||||
current_value.emplace(edge.ptr->properties.GetProperty(property));
|
||||
current_value.emplace(edge.ptr->GetProperty(property));
|
||||
// We could skip setting the value if the previous one is the same to the new
|
||||
// one. This would save some memory as a delta would not be created as well as
|
||||
// avoid copying the value. The reason we are not doing that is because the
|
||||
@ -140,7 +140,7 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
|
||||
// "modify in-place". Additionally, the created delta will make other
|
||||
// transactions get a SERIALIZATION_ERROR.
|
||||
CreateAndLinkDelta(transaction, edge.ptr, Delta::SetPropertyTag(), property, *current_value);
|
||||
edge.ptr->properties.SetProperty(property, value);
|
||||
edge.ptr->SetProperty(property, value);
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
|
||||
@ -162,7 +162,7 @@ Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, st
|
||||
|
||||
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
if (!edge_.ptr->properties.InitProperties(properties)) return false;
|
||||
if (!edge_.ptr->InitProperties(properties)) return false;
|
||||
utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
|
||||
for (const auto &[property, _] : properties) {
|
||||
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue());
|
||||
@ -184,11 +184,11 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAc
|
||||
|
||||
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties));
|
||||
using ReturnType = decltype(edge_.ptr->UpdateProperties(properties));
|
||||
std::optional<ReturnType> id_old_new_change;
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() {
|
||||
id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties));
|
||||
id_old_new_change.emplace(edge_.ptr->UpdateProperties(properties));
|
||||
for (auto &[property, old_value, new_value] : *id_old_new_change) {
|
||||
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
|
||||
}
|
||||
@ -207,15 +207,15 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
|
||||
|
||||
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
using ReturnType = decltype(edge_.ptr->properties.Properties());
|
||||
using ReturnType = decltype(edge_.ptr->Properties());
|
||||
std::optional<ReturnType> properties;
|
||||
utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
|
||||
properties.emplace(edge_.ptr->properties.Properties());
|
||||
properties.emplace(edge_.ptr->Properties());
|
||||
for (const auto &property : *properties) {
|
||||
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second);
|
||||
}
|
||||
|
||||
edge_.ptr->properties.ClearProperties();
|
||||
edge_.ptr->ClearProperties();
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
|
||||
@ -231,7 +231,7 @@ Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view)
|
||||
{
|
||||
auto guard = std::shared_lock{edge_.ptr->lock};
|
||||
deleted = edge_.ptr->deleted;
|
||||
value.emplace(edge_.ptr->properties.GetProperty(property));
|
||||
value.emplace(edge_.ptr->GetProperty(property));
|
||||
delta = edge_.ptr->delta;
|
||||
}
|
||||
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &value, property](const Delta &delta) {
|
||||
@ -271,7 +271,7 @@ Result<uint64_t> EdgeAccessor::GetPropertySize(PropertyId property, View view) c
|
||||
auto guard = std::shared_lock{edge_.ptr->lock};
|
||||
Delta *delta = edge_.ptr->delta;
|
||||
if (!delta) {
|
||||
return edge_.ptr->properties.PropertySize(property);
|
||||
return edge_.ptr->PropertySize(property);
|
||||
}
|
||||
|
||||
auto property_result = this->GetProperty(property, view);
|
||||
@ -295,7 +295,7 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::Properties(View view)
|
||||
{
|
||||
auto guard = std::shared_lock{edge_.ptr->lock};
|
||||
deleted = edge_.ptr->deleted;
|
||||
properties = edge_.ptr->properties.Properties();
|
||||
properties = edge_.ptr->Properties();
|
||||
delta = edge_.ptr->delta;
|
||||
}
|
||||
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &properties](const Delta &delta) {
|
||||
|
@ -121,7 +121,7 @@ inline bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, Prop
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
// Avoid IsPropertyEqual if already not possible
|
||||
if (delta == nullptr && (deleted || !has_label)) return false;
|
||||
current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value);
|
||||
current_value_equal_to_value = vertex.IsPropertyEqual(key, value);
|
||||
}
|
||||
|
||||
if (!deleted && has_label && current_value_equal_to_value) {
|
||||
@ -186,7 +186,7 @@ inline bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label,
|
||||
auto guard = std::shared_lock{vertex.lock};
|
||||
deleted = vertex.deleted;
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value);
|
||||
current_value_equal_to_value = vertex.IsPropertyEqual(key, value);
|
||||
delta = vertex.delta;
|
||||
}
|
||||
|
||||
@ -246,7 +246,7 @@ inline void TryInsertLabelPropertyIndex(Vertex &vertex, std::pair<LabelId, Prope
|
||||
if (vertex.deleted || !utils::Contains(vertex.labels, label_property_pair.first)) {
|
||||
return;
|
||||
}
|
||||
auto value = vertex.properties.GetProperty(label_property_pair.second);
|
||||
auto value = vertex.GetProperty(label_property_pair.second);
|
||||
if (value.IsNull()) {
|
||||
return;
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ void InMemoryLabelPropertyIndex::UpdateOnAddLabel(LabelId added_label, Vertex *v
|
||||
if (label_prop.first != added_label) {
|
||||
continue;
|
||||
}
|
||||
auto prop_value = vertex_after_update->properties.GetProperty(label_prop.second);
|
||||
auto prop_value = vertex_after_update->GetProperty(label_prop.second);
|
||||
if (!prop_value.IsNull()) {
|
||||
auto acc = storage.access();
|
||||
acc.insert(Entry{std::move(prop_value), vertex_after_update, tx.start_timestamp});
|
||||
|
@ -1044,7 +1044,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
const auto &properties = index_stats.property_label.l2p.find(current->label.value);
|
||||
if (properties != index_stats.property_label.l2p.end()) {
|
||||
for (const auto &property : properties->second) {
|
||||
auto current_value = vertex->properties.GetProperty(property);
|
||||
auto current_value = vertex->GetProperty(property);
|
||||
if (!current_value.IsNull()) {
|
||||
label_property_cleanup[current->label.value].emplace_back(std::move(current_value), vertex);
|
||||
}
|
||||
@ -1065,13 +1065,13 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
// value
|
||||
const auto &labels = index_stats.property_label.p2l.find(current->property.key);
|
||||
if (labels != index_stats.property_label.p2l.end()) {
|
||||
auto current_value = vertex->properties.GetProperty(current->property.key);
|
||||
auto current_value = vertex->GetProperty(current->property.key);
|
||||
if (!current_value.IsNull()) {
|
||||
property_cleanup[current->property.key].emplace_back(std::move(current_value), vertex);
|
||||
}
|
||||
}
|
||||
// Setting the correct value
|
||||
vertex->properties.SetProperty(current->property.key, *current->property.value);
|
||||
vertex->SetProperty(current->property.key, *current->property.value);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE: {
|
||||
@ -1146,7 +1146,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
|
||||
switch (current->action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
edge->properties.SetProperty(current->property.key, *current->property.value);
|
||||
edge->SetProperty(current->property.key, *current->property.value);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
|
@ -64,7 +64,7 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c
|
||||
|
||||
size_t i = 0;
|
||||
for (const auto &property : properties) {
|
||||
current_value_equal_to_value[i] = vertex.properties.IsPropertyEqual(property, value_array[i]);
|
||||
current_value_equal_to_value[i] = vertex.IsPropertyEqual(property, value_array[i]);
|
||||
property_array.values[i] = property;
|
||||
i++;
|
||||
}
|
||||
@ -155,7 +155,7 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std::
|
||||
// If delta we need to fetch for later processing
|
||||
size_t i = 0;
|
||||
for (const auto &property : properties) {
|
||||
current_value_equal_to_value[i] = vertex.properties.IsPropertyEqual(property, values[i]);
|
||||
current_value_equal_to_value[i] = vertex.IsPropertyEqual(property, values[i]);
|
||||
property_array.values[i] = property;
|
||||
i++;
|
||||
}
|
||||
@ -163,7 +163,7 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std::
|
||||
// otherwise do a short-circuiting check (we already know !deleted && has_label)
|
||||
size_t i = 0;
|
||||
for (const auto &property : properties) {
|
||||
if (!vertex.properties.IsPropertyEqual(property, values[i])) return false;
|
||||
if (!vertex.IsPropertyEqual(property, values[i])) return false;
|
||||
i++;
|
||||
}
|
||||
return true;
|
||||
@ -269,7 +269,7 @@ void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const T
|
||||
}
|
||||
|
||||
for (auto &[props, storage] : constraint->second) {
|
||||
auto values = vertex->properties.ExtractPropertyValues(props);
|
||||
auto values = vertex->ExtractPropertyValues(props);
|
||||
|
||||
if (!values) {
|
||||
continue;
|
||||
@ -334,7 +334,7 @@ std::optional<ConstraintViolation> InMemoryUniqueConstraints::DoValidate(
|
||||
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
auto values = vertex.properties.ExtractPropertyValues(properties);
|
||||
auto values = vertex.ExtractPropertyValues(properties);
|
||||
if (!values) {
|
||||
return std::nullopt;
|
||||
}
|
||||
@ -359,7 +359,7 @@ void InMemoryUniqueConstraints::AbortEntries(std::span<Vertex const *const> vert
|
||||
}
|
||||
|
||||
for (auto &[props, storage] : constraint->second) {
|
||||
auto values = vertex->properties.ExtractPropertyValues(props);
|
||||
auto values = vertex->ExtractPropertyValues(props);
|
||||
|
||||
if (!values) {
|
||||
continue;
|
||||
@ -451,7 +451,7 @@ std::optional<ConstraintViolation> InMemoryUniqueConstraints::Validate(const Ver
|
||||
}
|
||||
|
||||
for (const auto &[properties, storage] : constraint->second) {
|
||||
auto value_array = vertex.properties.ExtractPropertyValues(properties);
|
||||
auto value_array = vertex.ExtractPropertyValues(properties);
|
||||
|
||||
if (!value_array) {
|
||||
continue;
|
||||
|
52
src/storage/v2/property_disk_store.cpp
Normal file
52
src/storage/v2/property_disk_store.cpp
Normal file
@ -0,0 +1,52 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "storage/v2/property_disk_store.hpp"
|
||||
|
||||
#include <rocksdb/compression_type.h>
|
||||
#include <rocksdb/filter_policy.h>
|
||||
#include <rocksdb/memtablerep.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/slice_transform.h>
|
||||
#include <rocksdb/statistics.h>
|
||||
#include <rocksdb/table.h>
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
PDS *PDS::ptr_ = nullptr;
|
||||
|
||||
PDS::PDS(std::filesystem::path root)
|
||||
: kvstore_{root / "pds", std::invoke([]() {
|
||||
rocksdb::Options options;
|
||||
rocksdb::BlockBasedTableOptions table_options;
|
||||
options.write_buffer_size = 1024U << 20U;
|
||||
options.writable_file_max_buffer_size = 64 * 1024 * 1024;
|
||||
|
||||
table_options.block_cache = rocksdb::NewLRUCache(1024 * 1024 * 1024);
|
||||
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(sizeof(storage::Gid)));
|
||||
table_options.optimize_filters_for_memory = false;
|
||||
table_options.enable_index_compression = false;
|
||||
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
|
||||
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(sizeof(storage::Gid)));
|
||||
options.max_background_jobs = 4;
|
||||
options.enable_pipelined_write = true;
|
||||
options.avoid_unnecessary_blocking_io = true;
|
||||
|
||||
options.create_if_missing = true;
|
||||
|
||||
options.use_direct_io_for_flush_and_compaction = true;
|
||||
options.use_direct_reads = true;
|
||||
|
||||
// options.compression = rocksdb::kLZ4HCCompression;
|
||||
return options;
|
||||
})} {}
|
||||
|
||||
} // namespace memgraph::storage
|
137
src/storage/v2/property_disk_store.hpp
Normal file
137
src/storage/v2/property_disk_store.hpp
Normal file
@ -0,0 +1,137 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <rocksdb/options.h>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <json/json.hpp>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <sstream>
|
||||
|
||||
#include "kvstore/kvstore.hpp"
|
||||
#include "slk/streams.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
|
||||
#include "slk/serialization.hpp"
|
||||
#include "storage/v2/replication/slk.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
class PDS {
|
||||
public:
|
||||
static void Init(std::filesystem::path root) {
|
||||
if (ptr_ == nullptr) ptr_ = new PDS(root);
|
||||
}
|
||||
|
||||
static void Deinit() { delete ptr_; }
|
||||
|
||||
static PDS *get() {
|
||||
if (ptr_ == nullptr) {
|
||||
ptr_ = new PDS("/tmp");
|
||||
}
|
||||
return ptr_;
|
||||
}
|
||||
|
||||
static std::string ToKey(Gid gid, PropertyId pid) {
|
||||
std::string key(sizeof(gid) + sizeof(pid), '\0');
|
||||
memcpy(key.data(), &gid, sizeof(gid));
|
||||
memcpy(&key[sizeof(gid)], &pid, sizeof(pid));
|
||||
return key;
|
||||
}
|
||||
|
||||
static std::string ToPrefix(Gid gid) {
|
||||
std::string key(sizeof(gid), '\0');
|
||||
memcpy(key.data(), &gid, sizeof(gid));
|
||||
return key;
|
||||
}
|
||||
|
||||
static Gid ToGid(std::string_view sv) {
|
||||
uint64_t gid;
|
||||
gid = *((uint64_t *)sv.data());
|
||||
return Gid::FromUint(gid);
|
||||
}
|
||||
|
||||
static PropertyId ToPid(std::string_view sv) {
|
||||
uint32_t pid;
|
||||
pid = *((uint32_t *)&sv[sizeof(Gid)]);
|
||||
return PropertyId::FromUint(pid);
|
||||
}
|
||||
|
||||
static PropertyValue ToPV(std::string_view sv) {
|
||||
PropertyValue pv;
|
||||
slk::Reader reader((const uint8_t *)sv.data(), sv.size());
|
||||
slk::Load(&pv, &reader);
|
||||
return pv;
|
||||
}
|
||||
|
||||
static std::string ToStr(const PropertyValue &pv) {
|
||||
std::string val{};
|
||||
slk::Builder builder([&val](const uint8_t *data, size_t size, bool /*have_more*/) {
|
||||
const auto old_size = val.size();
|
||||
val.resize(old_size + size);
|
||||
memcpy(&val[old_size], data, size);
|
||||
});
|
||||
slk::Save(pv, &builder);
|
||||
builder.Finalize();
|
||||
return val;
|
||||
}
|
||||
|
||||
std::optional<PropertyValue> Get(Gid gid, PropertyId pid) {
|
||||
const auto element = kvstore_.Get(ToKey(gid, pid));
|
||||
if (element) {
|
||||
return ToPV(*element);
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
size_t GetSize(Gid gid, PropertyId pid) {
|
||||
const auto element = kvstore_.Get(ToKey(gid, pid));
|
||||
if (element) {
|
||||
return element->size();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::map<PropertyId, PropertyValue> Get(Gid gid) {
|
||||
std::map<PropertyId, PropertyValue> res;
|
||||
auto itr = kvstore_.begin(ToPrefix(gid));
|
||||
auto end = kvstore_.end(ToPrefix(gid));
|
||||
for (; itr != end; ++itr) {
|
||||
if (!itr.IsValid()) continue;
|
||||
res[ToPid(itr->first)] = ToPV(itr->second);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
auto Set(Gid gid, PropertyId pid, const PropertyValue &pv) {
|
||||
if (pv.IsNull()) {
|
||||
return kvstore_.Delete(ToKey(gid, pid));
|
||||
}
|
||||
return kvstore_.Put(ToKey(gid, pid), ToStr(pv));
|
||||
}
|
||||
|
||||
void Clear(Gid gid) { kvstore_.DeletePrefix(ToPrefix(gid)); }
|
||||
|
||||
bool Has(Gid gid, PropertyId pid) { return kvstore_.Size(ToKey(gid, pid)) != 0; }
|
||||
|
||||
// kvstore::KVStore::iterator Itr() {}
|
||||
|
||||
private:
|
||||
PDS(std::filesystem::path root);
|
||||
kvstore::KVStore kvstore_;
|
||||
static PDS *ptr_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage
|
@ -46,6 +46,9 @@ Storage::Storage(Config config, StorageMode storage_mode)
|
||||
indices_(config, storage_mode),
|
||||
constraints_(config, storage_mode) {
|
||||
spdlog::info("Created database with {} storage mode.", StorageModeToString(storage_mode));
|
||||
|
||||
// TODO: Make this work with MT
|
||||
PDS::Init(config_.durability.storage_directory);
|
||||
}
|
||||
|
||||
Storage::Accessor::Accessor(SharedAccess /* tag */, Storage *storage, IsolationLevel isolation_level,
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -12,6 +12,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <limits>
|
||||
#include <optional>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
@ -21,19 +22,31 @@
|
||||
#include "storage/v2/property_store.hpp"
|
||||
#include "utils/rw_spin_lock.hpp"
|
||||
|
||||
#include "storage/v2/property_disk_store.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
struct Vertex {
|
||||
Vertex(Gid gid, Delta *delta) : gid(gid), deleted(false), delta(delta) {
|
||||
Vertex(Gid gid, Delta *delta) : gid(gid), deleted(false), has_prop(false), delta(delta) {
|
||||
MG_ASSERT(delta == nullptr || delta->action == Delta::Action::DELETE_OBJECT ||
|
||||
delta->action == Delta::Action::DELETE_DESERIALIZED_OBJECT,
|
||||
"Vertex must be created with an initial DELETE_OBJECT delta!");
|
||||
}
|
||||
|
||||
~Vertex() {
|
||||
// TODO: Move to another place <- this will get called twice if moved...
|
||||
if (!moved) ClearProperties();
|
||||
}
|
||||
|
||||
Vertex(Vertex &) = delete;
|
||||
Vertex &operator=(Vertex &) = delete;
|
||||
Vertex(Vertex &&) noexcept = default;
|
||||
Vertex &operator=(Vertex &&) = delete;
|
||||
|
||||
const Gid gid;
|
||||
|
||||
std::vector<LabelId> labels;
|
||||
PropertyStore properties;
|
||||
// PropertyStore properties;
|
||||
|
||||
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
|
||||
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
|
||||
@ -42,8 +55,138 @@ struct Vertex {
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
// uint16_t PAD;
|
||||
bool has_prop;
|
||||
|
||||
class HotFixMove {
|
||||
public:
|
||||
HotFixMove() {}
|
||||
HotFixMove(HotFixMove &&other) noexcept {
|
||||
if (this != &other) {
|
||||
// We want only the latest object to be marked as not-moved; while all previous should be marked as moved
|
||||
moved = false;
|
||||
other.moved = true;
|
||||
}
|
||||
}
|
||||
HotFixMove(HotFixMove &) = delete;
|
||||
HotFixMove &operator=(HotFixMove &) = delete;
|
||||
HotFixMove &operator=(HotFixMove &&) = delete;
|
||||
|
||||
operator bool() const { return moved; }
|
||||
|
||||
private:
|
||||
bool moved{false};
|
||||
} moved;
|
||||
|
||||
Delta *delta;
|
||||
|
||||
PropertyValue GetProperty(PropertyId property) const {
|
||||
// if (deleted) return {};
|
||||
if (!has_prop) return {};
|
||||
const auto prop = PDS::get()->Get(gid, property);
|
||||
if (prop) return *prop;
|
||||
return {};
|
||||
}
|
||||
|
||||
bool SetProperty(PropertyId property, const PropertyValue &value) {
|
||||
// if (deleted) return {};
|
||||
has_prop = true;
|
||||
return PDS::get()->Set(gid, property, value);
|
||||
}
|
||||
|
||||
bool HasProperty(PropertyId property) const {
|
||||
// if (deleted) return {};
|
||||
if (!has_prop) return {};
|
||||
return PDS::get()->Has(gid, property);
|
||||
}
|
||||
|
||||
bool HasAllProperties(const std::set<PropertyId> &properties) const {
|
||||
// if (deleted) return {};
|
||||
if (!has_prop) return {};
|
||||
return std::all_of(properties.begin(), properties.end(), [this](const auto &prop) { return HasProperty(prop); });
|
||||
}
|
||||
|
||||
bool IsPropertyEqual(PropertyId property, const PropertyValue &value) const {
|
||||
// if (deleted) return {};
|
||||
if (!has_prop) return value.IsNull();
|
||||
const auto val = GetProperty(property);
|
||||
return val == value;
|
||||
}
|
||||
|
||||
template <typename TContainer>
|
||||
bool InitProperties(const TContainer &properties) {
|
||||
// if (deleted) return {};
|
||||
auto *pds = PDS::get();
|
||||
for (const auto &[property, value] : properties) {
|
||||
if (value.IsNull()) {
|
||||
continue;
|
||||
}
|
||||
if (!pds->Set(gid, property, value)) {
|
||||
return false;
|
||||
}
|
||||
has_prop = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void ClearProperties() {
|
||||
if (!has_prop) return;
|
||||
has_prop = false;
|
||||
auto *pds = PDS::get();
|
||||
pds->Clear(gid);
|
||||
}
|
||||
|
||||
std::map<PropertyId, PropertyValue> Properties() {
|
||||
// if (deleted) return {};
|
||||
if (!has_prop) return {};
|
||||
return PDS::get()->Get(gid);
|
||||
}
|
||||
|
||||
std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>> UpdateProperties(
|
||||
std::map<PropertyId, PropertyValue> &properties) {
|
||||
// if (deleted) return {};
|
||||
auto old_properties = Properties();
|
||||
ClearProperties();
|
||||
|
||||
std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>> id_old_new_change;
|
||||
id_old_new_change.reserve(properties.size() + old_properties.size());
|
||||
for (const auto &[prop_id, new_value] : properties) {
|
||||
if (!old_properties.contains(prop_id)) {
|
||||
id_old_new_change.emplace_back(prop_id, PropertyValue(), new_value);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto &[old_key, old_value] : old_properties) {
|
||||
auto [it, inserted] = properties.emplace(old_key, old_value);
|
||||
if (!inserted) {
|
||||
auto &new_value = it->second;
|
||||
id_old_new_change.emplace_back(it->first, old_value, new_value);
|
||||
}
|
||||
}
|
||||
|
||||
MG_ASSERT(InitProperties(properties));
|
||||
return id_old_new_change;
|
||||
}
|
||||
|
||||
uint64_t PropertySize(PropertyId property) const {
|
||||
// if (deleted) return {};
|
||||
if (!has_prop) return {};
|
||||
return PDS::get()->GetSize(gid, property);
|
||||
}
|
||||
|
||||
std::optional<std::vector<PropertyValue>> ExtractPropertyValues(const std::set<PropertyId> &properties) const {
|
||||
// if (deleted) return {};
|
||||
if (!has_prop) return {};
|
||||
std::vector<PropertyValue> value_array;
|
||||
value_array.reserve(properties.size());
|
||||
for (const auto &prop : properties) {
|
||||
auto value = GetProperty(prop);
|
||||
if (value.IsNull()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
value_array.emplace_back(std::move(value));
|
||||
}
|
||||
return value_array;
|
||||
}
|
||||
};
|
||||
|
||||
static_assert(alignof(Vertex) >= 8, "The Vertex should be aligned to at least 8!");
|
||||
|
@ -261,7 +261,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
|
||||
|
||||
if (vertex_->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
auto current_value = vertex_->properties.GetProperty(property);
|
||||
auto current_value = vertex_->GetProperty(property);
|
||||
// We could skip setting the value if the previous one is the same to the new
|
||||
// one. This would save some memory as a delta would not be created as well as
|
||||
// avoid copying the value. The reason we are not doing that is because the
|
||||
@ -272,7 +272,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[transaction = transaction_, vertex = vertex_, &value, &property, ¤t_value]() {
|
||||
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value);
|
||||
vertex->properties.SetProperty(property, value);
|
||||
vertex->SetProperty(property, value);
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
|
||||
@ -303,7 +303,7 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
|
||||
bool result{false};
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[&result, &properties, storage = storage_, transaction = transaction_, vertex = vertex_]() {
|
||||
if (!vertex->properties.InitProperties(properties)) {
|
||||
if (!vertex->InitProperties(properties)) {
|
||||
result = false;
|
||||
return;
|
||||
}
|
||||
@ -339,11 +339,11 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
|
||||
|
||||
if (vertex_->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
using ReturnType = decltype(vertex_->properties.UpdateProperties(properties));
|
||||
using ReturnType = decltype(vertex_->UpdateProperties(properties));
|
||||
std::optional<ReturnType> id_old_new_change;
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() {
|
||||
id_old_new_change.emplace(vertex->properties.UpdateProperties(properties));
|
||||
id_old_new_change.emplace(vertex->UpdateProperties(properties));
|
||||
if (!id_old_new_change.has_value()) {
|
||||
return;
|
||||
}
|
||||
@ -375,11 +375,11 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
|
||||
|
||||
if (vertex_->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
using ReturnType = decltype(vertex_->properties.Properties());
|
||||
using ReturnType = decltype(vertex_->Properties());
|
||||
std::optional<ReturnType> properties;
|
||||
utils::AtomicMemoryBlock atomic_memory_block{
|
||||
[storage = storage_, transaction = transaction_, vertex = vertex_, &properties]() {
|
||||
properties.emplace(vertex->properties.Properties());
|
||||
properties.emplace(vertex->Properties());
|
||||
if (!properties.has_value()) {
|
||||
return;
|
||||
}
|
||||
@ -391,7 +391,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
|
||||
if (transaction->constraint_verification_info) {
|
||||
transaction->constraint_verification_info->RemovedProperty(vertex);
|
||||
}
|
||||
vertex->properties.ClearProperties();
|
||||
vertex->ClearProperties();
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
|
||||
@ -406,7 +406,7 @@ Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view
|
||||
{
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
value = vertex_->properties.GetProperty(property);
|
||||
value = vertex_->GetProperty(property);
|
||||
delta = vertex_->delta;
|
||||
}
|
||||
|
||||
@ -451,7 +451,7 @@ Result<uint64_t> VertexAccessor::GetPropertySize(PropertyId property, View view)
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
Delta *delta = vertex_->delta;
|
||||
if (!delta) {
|
||||
return vertex_->properties.PropertySize(property);
|
||||
return vertex_->PropertySize(property);
|
||||
}
|
||||
}
|
||||
|
||||
@ -474,7 +474,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
|
||||
{
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
properties = vertex_->properties.Properties();
|
||||
properties = vertex_->Properties();
|
||||
delta = vertex_->delta;
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -143,7 +143,8 @@ inline std::string SerializeEdgeAsValue(const std::string &src_vertex_gid, const
|
||||
result += edge_type_str;
|
||||
result += "|";
|
||||
if (edge) {
|
||||
return result + utils::SerializeProperties(edge->properties);
|
||||
// TODO: Re-enable
|
||||
// return result + utils::SerializeProperties(edge->properties);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -921,23 +921,27 @@ class SkipList final : detail::SkipListNode_base {
|
||||
}
|
||||
|
||||
SkipList(SkipList &&other) noexcept : head_(other.head_), gc_(other.GetMemoryResource()), size_(other.size_.load()) {
|
||||
other.head_ = nullptr;
|
||||
if (this != &other) {
|
||||
other.head_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
SkipList &operator=(SkipList &&other) noexcept {
|
||||
MG_ASSERT(other.GetMemoryResource() == GetMemoryResource(),
|
||||
"Move assignment with different MemoryResource is not supported");
|
||||
TNode *head = head_;
|
||||
while (head != nullptr) {
|
||||
TNode *succ = head->nexts[0].load(std::memory_order_acquire);
|
||||
size_t bytes = SkipListNodeSize(*head);
|
||||
head->~TNode();
|
||||
GetMemoryResource()->Deallocate(head, bytes);
|
||||
head = succ;
|
||||
if (this != &other) {
|
||||
MG_ASSERT(other.GetMemoryResource() == GetMemoryResource(),
|
||||
"Move assignment with different MemoryResource is not supported");
|
||||
TNode *head = head_;
|
||||
while (head != nullptr) {
|
||||
TNode *succ = head->nexts[0].load(std::memory_order_acquire);
|
||||
size_t bytes = SkipListNodeSize(*head);
|
||||
head->~TNode();
|
||||
GetMemoryResource()->Deallocate(head, bytes);
|
||||
head = succ;
|
||||
}
|
||||
head_ = other.head_;
|
||||
size_ = other.size_.load();
|
||||
other.head_ = nullptr;
|
||||
}
|
||||
head_ = other.head_;
|
||||
size_ = other.size_.load();
|
||||
other.head_ = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
@ -452,3 +452,6 @@ add_unit_test(coordinator_cluster_state.cpp)
|
||||
target_link_libraries(${test_prefix}coordinator_cluster_state gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}coordinator_cluster_state PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
||||
add_unit_test(pds.cpp)
|
||||
target_link_libraries(${test_prefix}pds mg-storage-v2)
|
||||
|
@ -9,9 +9,11 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <filesystem>
|
||||
#include "bfs_common.hpp"
|
||||
|
||||
#include "disk_test_utils.hpp"
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/disk/storage.hpp"
|
||||
#include "storage/v2/inmemory/storage.hpp"
|
||||
|
||||
@ -22,13 +24,19 @@ template <typename StorageType>
|
||||
class SingleNodeDb : public Database {
|
||||
public:
|
||||
const std::string testSuite = "bfs_single_node";
|
||||
const std::filesystem::path root_test = "/tmp/" + testSuite;
|
||||
|
||||
SingleNodeDb() : config_(disk_test_utils::GenerateOnDiskConfig(testSuite)), db_(new StorageType(config_)) {}
|
||||
SingleNodeDb() : config_(disk_test_utils::GenerateOnDiskConfig(testSuite)), db_(new StorageType(config_)) {
|
||||
memgraph::storage::UpdatePaths(config_, root_test);
|
||||
}
|
||||
|
||||
~SingleNodeDb() override {
|
||||
if (std::is_same<StorageType, memgraph::storage::DiskStorage>::value) {
|
||||
disk_test_utils::RemoveRocksDbDirs(testSuite);
|
||||
}
|
||||
if (std::filesystem::exists(root_test)) {
|
||||
std::filesystem::remove_all(root_test);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<memgraph::storage::Storage::Accessor> Access() override {
|
||||
|
328
tests/unit/pds.cpp
Normal file
328
tests/unit/pds.cpp
Normal file
@ -0,0 +1,328 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/property_disk_store.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/temporal.hpp"
|
||||
|
||||
const static std::filesystem::path test_root{"/tmp/MG_pds_test"};
|
||||
|
||||
class PdsTest : public ::testing::Test {
|
||||
protected:
|
||||
PdsTest() { memgraph::storage::PDS::Init(test_root); }
|
||||
|
||||
~PdsTest() override {
|
||||
try {
|
||||
if (std::filesystem::exists(test_root)) {
|
||||
std::filesystem::remove_all(test_root);
|
||||
}
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(PdsTest, Keys) {
|
||||
using namespace memgraph::storage;
|
||||
auto *pds = PDS::get();
|
||||
|
||||
auto gid = Gid::FromUint(13);
|
||||
const auto gid_sv = pds->ToPrefix(gid);
|
||||
EXPECT_TRUE(memcmp(&gid, gid_sv.data(), sizeof(uint64_t)) == 0);
|
||||
EXPECT_EQ(gid, pds->ToGid(gid_sv));
|
||||
|
||||
auto pid = PropertyId::FromUint(243);
|
||||
const auto key = pds->ToKey(gid, pid);
|
||||
EXPECT_TRUE(memcmp(&gid, key.data(), sizeof(uint64_t)) == 0);
|
||||
EXPECT_TRUE(memcmp(&pid, &key[sizeof(gid)], sizeof(uint32_t)) == 0);
|
||||
EXPECT_EQ(pid, pds->ToPid(key));
|
||||
}
|
||||
|
||||
TEST_F(PdsTest, BasicUsage) {
|
||||
using namespace memgraph::storage;
|
||||
auto *pds = PDS::get();
|
||||
|
||||
Gid gid;
|
||||
PropertyId pid;
|
||||
|
||||
PropertyValue pv_bf(false);
|
||||
PropertyValue pv_bt(true);
|
||||
PropertyValue pv_int0(0);
|
||||
PropertyValue pv_int1(1);
|
||||
PropertyValue pv_int16(16);
|
||||
PropertyValue pv_640(int64_t(0));
|
||||
PropertyValue pv_641(int64_t(1));
|
||||
PropertyValue pv_64256(int64_t(256));
|
||||
PropertyValue pv_double0(0.0);
|
||||
PropertyValue pv_double1(1.0);
|
||||
PropertyValue pv_double1024(10.24);
|
||||
PropertyValue pv_str("");
|
||||
PropertyValue pv_str0("0");
|
||||
PropertyValue pv_strabc("abc");
|
||||
PropertyValue pv_tdldt0(TemporalData(TemporalType::LocalDateTime, 0));
|
||||
PropertyValue pv_tdlt0(TemporalData(TemporalType::LocalTime, 0));
|
||||
PropertyValue pv_tdd0(TemporalData(TemporalType::Date, 0));
|
||||
PropertyValue pv_tddur0(TemporalData(TemporalType::Duration, 0));
|
||||
PropertyValue pv_tdldt1(TemporalData(TemporalType::LocalDateTime, 100000));
|
||||
PropertyValue pv_tdlt1(TemporalData(TemporalType::LocalTime, 100000));
|
||||
PropertyValue pv_tdd1(TemporalData(TemporalType::Date, 100000));
|
||||
PropertyValue pv_tddur1(TemporalData(TemporalType::Duration, 100000));
|
||||
PropertyValue pv_v(std::vector<PropertyValue>{PropertyValue(false), PropertyValue(1), PropertyValue(256),
|
||||
PropertyValue(1.123), PropertyValue("")});
|
||||
PropertyValue pv_vv(std::vector<PropertyValue>{
|
||||
PropertyValue{std::vector<PropertyValue>{PropertyValue(false), PropertyValue(1), PropertyValue(256),
|
||||
PropertyValue(1.123), PropertyValue("")}},
|
||||
PropertyValue{"string"}, PropertyValue{"list"}});
|
||||
|
||||
auto test = [&] {
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_bf));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsBool());
|
||||
ASSERT_FALSE(val->ValueBool());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_bt));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsBool());
|
||||
ASSERT_TRUE(val->ValueBool());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_int0));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsInt());
|
||||
ASSERT_EQ(val->ValueInt(), 0);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_int1));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsInt());
|
||||
ASSERT_EQ(val->ValueInt(), 1);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_int16));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsInt());
|
||||
ASSERT_EQ(val->ValueInt(), 16);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_640));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsInt());
|
||||
ASSERT_EQ(val->ValueInt(), 0);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_641));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsInt());
|
||||
ASSERT_EQ(val->ValueInt(), 1);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_64256));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsInt());
|
||||
ASSERT_EQ(val->ValueInt(), 256);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_double0));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsDouble());
|
||||
ASSERT_EQ(val->ValueDouble(), 0.0);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_double1));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsDouble());
|
||||
ASSERT_EQ(val->ValueDouble(), 1.0);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_double1024));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsDouble());
|
||||
ASSERT_EQ(val->ValueDouble(), 10.24);
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_str));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsString());
|
||||
ASSERT_EQ(val->ValueString(), "");
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_str0));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsString());
|
||||
ASSERT_EQ(val->ValueString(), "0");
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_strabc));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsString());
|
||||
ASSERT_EQ(val->ValueString(), "abc");
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tdd0));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tdd0.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tdd1));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tdd1.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tddur0));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tddur0.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tddur1));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tddur1.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tdldt0));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tdldt0.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tdldt1));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tdldt1.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tdlt0));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tdlt0.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_tdlt1));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsTemporalData());
|
||||
ASSERT_EQ(val->ValueTemporalData(), pv_tdlt1.ValueTemporalData());
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_v));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsList());
|
||||
const auto list = val->ValueList();
|
||||
ASSERT_EQ(list.size(), 5);
|
||||
ASSERT_EQ(list[0], PropertyValue(false));
|
||||
ASSERT_EQ(list[1], PropertyValue(1));
|
||||
ASSERT_EQ(list[2], PropertyValue(256));
|
||||
ASSERT_EQ(list[3], PropertyValue(1.123));
|
||||
ASSERT_EQ(list[4], PropertyValue(""));
|
||||
}
|
||||
{
|
||||
ASSERT_TRUE(pds->Set(gid, pid, pv_vv));
|
||||
const auto val = pds->Get(gid, pid);
|
||||
ASSERT_TRUE(val);
|
||||
ASSERT_TRUE(val->IsList());
|
||||
const auto list = val->ValueList();
|
||||
ASSERT_EQ(list.size(), 3);
|
||||
{
|
||||
const auto &val = list[0];
|
||||
ASSERT_TRUE(val.IsList());
|
||||
const auto list = val.ValueList();
|
||||
ASSERT_EQ(list.size(), 5);
|
||||
ASSERT_EQ(list[0], PropertyValue(false));
|
||||
ASSERT_EQ(list[1], PropertyValue(1));
|
||||
ASSERT_EQ(list[2], PropertyValue(256));
|
||||
ASSERT_EQ(list[3], PropertyValue(1.123));
|
||||
ASSERT_EQ(list[4], PropertyValue(""));
|
||||
}
|
||||
ASSERT_EQ(list[1], PropertyValue("string"));
|
||||
ASSERT_EQ(list[2], PropertyValue("list"));
|
||||
}
|
||||
};
|
||||
|
||||
gid.FromUint(0);
|
||||
pid.FromUint(0);
|
||||
test();
|
||||
|
||||
gid.FromUint(0);
|
||||
pid.FromUint(1);
|
||||
test();
|
||||
|
||||
gid.FromUint(1);
|
||||
pid.FromUint(0);
|
||||
test();
|
||||
|
||||
gid.FromUint(1);
|
||||
pid.FromUint(1);
|
||||
test();
|
||||
|
||||
gid.FromUint(0);
|
||||
pid.FromUint(5446516);
|
||||
test();
|
||||
|
||||
gid.FromUint(654645);
|
||||
pid.FromUint(0);
|
||||
test();
|
||||
|
||||
gid.FromUint(987615);
|
||||
pid.FromUint(565);
|
||||
test();
|
||||
}
|
||||
|
||||
TEST_F(PdsTest, Get) {
|
||||
using namespace memgraph::storage;
|
||||
auto *pds = PDS::get();
|
||||
pds->Set(Gid::FromUint(0), PropertyId::FromUint(1), PropertyValue{"test1"});
|
||||
pds->Set(Gid::FromUint(0), PropertyId::FromUint(2), PropertyValue{"test2"});
|
||||
pds->Set(Gid::FromUint(0), PropertyId::FromUint(3), PropertyValue{"test3"});
|
||||
pds->Set(Gid::FromUint(1), PropertyId::FromUint(0), PropertyValue{"test0"});
|
||||
pds->Set(Gid::FromUint(1), PropertyId::FromUint(2), PropertyValue{"test02"});
|
||||
|
||||
auto all_0 = pds->Get(Gid::FromUint(0));
|
||||
ASSERT_EQ(all_0.size(), 3);
|
||||
ASSERT_EQ(all_0[PropertyId::FromUint(1)], PropertyValue{"test1"});
|
||||
ASSERT_EQ(all_0[PropertyId::FromUint(2)], PropertyValue{"test2"});
|
||||
ASSERT_EQ(all_0[PropertyId::FromUint(3)], PropertyValue{"test3"});
|
||||
}
|
@ -137,11 +137,10 @@ class DeltaGenerator final {
|
||||
void SetProperty(memgraph::storage::Vertex *vertex, const std::string &property,
|
||||
const memgraph::storage::PropertyValue &value) {
|
||||
auto property_id = memgraph::storage::PropertyId::FromUint(gen_->mapper_.NameToId(property));
|
||||
auto &props = vertex->properties;
|
||||
auto old_value = props.GetProperty(property_id);
|
||||
auto old_value = vertex->GetProperty(property_id);
|
||||
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::SetPropertyTag(),
|
||||
property_id, old_value);
|
||||
props.SetProperty(property_id, value);
|
||||
vertex->SetProperty(property_id, value);
|
||||
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
|
||||
{
|
||||
memgraph::storage::durability::WalDeltaData data;
|
||||
@ -185,7 +184,7 @@ class DeltaGenerator final {
|
||||
ASSERT_NE(vertex, gen_->vertices_.end());
|
||||
auto property_id = memgraph::storage::PropertyId::FromUint(
|
||||
gen_->mapper_.NameToId(data.vertex_edge_set_property.property));
|
||||
data.vertex_edge_set_property.value = vertex->properties.GetProperty(property_id);
|
||||
data.vertex_edge_set_property.value = vertex->GetProperty(property_id);
|
||||
}
|
||||
gen_->data_.emplace_back(commit_timestamp, data);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user