Delete api

This commit is contained in:
Andi Skrgat 2023-04-07 09:56:00 +02:00
parent 1bed62da4a
commit b6beffa9e2
2 changed files with 224 additions and 114 deletions

View File

@ -15,11 +15,14 @@
#include <rocksdb/iterator.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
#include <iterator>
#include <numeric>
#include <optional>
#include <stdexcept>
#include <string>
#include <string_view>
#include <tuple>
#include <vector>
#include "query/common.hpp"
#include "query/db_accessor.hpp"
#include "spdlog/spdlog.h"
@ -42,7 +45,17 @@
namespace memgraph::storage::rocks {
inline void CheckRocksDBStatus(rocksdb::Status status) { MG_ASSERT(status.ok(), "rocksdb: {}", status.ToString()); }
/// Use it for operations that must successfully finish.
inline void AssertRocksDBStatus(const rocksdb::Status &status) {
MG_ASSERT(status.ok(), "rocksdb: {}", status.ToString());
}
inline bool CheckRocksDBStatus(const rocksdb::Status &status) {
if (!status.ok()) {
spdlog::error("rocksdb: {}", status.ToString());
}
return status.ok();
}
class RocksDBStorage {
public:
@ -51,75 +64,37 @@ class RocksDBStorage {
// options_.OptimizeLevelStyleCompaction();
std::filesystem::path rocksdb_path = "./rocks_experiment_unit_test";
MG_ASSERT(utils::EnsureDir(rocksdb_path), "Unable to create storage folder on the disk.");
CheckRocksDBStatus(rocksdb::DB::Open(options_, rocksdb_path, &db_));
CheckRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "vertex", &vertex_chandle));
CheckRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "edge", &edge_chandle));
AssertRocksDBStatus(rocksdb::DB::Open(options_, rocksdb_path, &db_));
AssertRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "vertex", &vertex_chandle));
AssertRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "edge", &edge_chandle));
}
// TODO: explicitly delete other constructors
~RocksDBStorage() {
CheckRocksDBStatus(db_->DropColumnFamily(vertex_chandle));
CheckRocksDBStatus(db_->DropColumnFamily(edge_chandle));
CheckRocksDBStatus(db_->Close());
AssertRocksDBStatus(db_->DropColumnFamily(vertex_chandle));
AssertRocksDBStatus(db_->DropColumnFamily(edge_chandle));
AssertRocksDBStatus(db_->Close());
delete db_;
}
// STORING PART
// EDGE ACCESSOR FUNCTIONALITIES
// -----------------------------------------------------------
// Serialize and store in-memory vertex to the disk.
// TODO: write the exact format
// Properties are serialized as the value
void StoreVertex(const query::VertexAccessor &vertex_acc) {
CheckRocksDBStatus(db_->Put(rocksdb::WriteOptions(), vertex_chandle, SerializeVertex(vertex_acc),
SerializeProperties(vertex_acc.PropertyStore())));
// fetch the edge's source vertex by its GID
std::optional<query::VertexAccessor> FromVertex(const query::EdgeAccessor &edge_acc, query::DbAccessor &dba) {
return FindVertex(SerializeIdType(edge_acc.From().Gid()), dba);
}
// TODO: remove config being sent as the parameter. Later will be added as the part of the storage accessor as in the
// memory version. for now assume that we always operate with edges having properties
void StoreEdge(const query::EdgeAccessor &edge_acc) {
auto [src_dest_key, dest_src_key] = SerializeEdge(edge_acc);
const std::string value = SerializeProperties(edge_acc.PropertyStore());
CheckRocksDBStatus(db_->Put(rocksdb::WriteOptions(), edge_chandle, src_dest_key, value));
CheckRocksDBStatus(db_->Put(rocksdb::WriteOptions(), edge_chandle, dest_src_key, value));
// fetch the edge's destination vertex by its GID
std::optional<query::VertexAccessor> ToVertex(const query::EdgeAccessor &edge_acc, query::DbAccessor &dba) {
return FindVertex(SerializeIdType(edge_acc.To().Gid()), dba);
}
// UPDATE PART
// -----------------------------------------------------------
// VERTEX ACCESSOR FUNCTIONALITIES
// ------------------------------------------------------------
// Clear all entries from the database.
// TODO: check if this deletes all entries, or you also need to specify handle here
// TODO: This will not be needed in the production code and can possibly removed in testing
void Clear() {
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
db_->Delete(rocksdb::WriteOptions(), it->key().ToString());
}
delete it;
}
// READ PART
// -----------------------------------------------------------
// TODO: change it to std::optional if the value doesn't exist
// TODO: if the need comes for using also a GID object, use std::variant
// This should be part of edge accessor since it should be impossible to search vertex by a gid
// This should again be changed when we have mulitple same vertices
std::optional<query::VertexAccessor> Vertex(const std::string_view gid, query::DbAccessor &dba) {
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
const std::string_view key = it->key().ToStringView();
if (key.starts_with(gid)) {
return DeserializeVertex(key, it->value().ToStringView(), dba);
}
}
delete it;
return std::nullopt;
}
// TODO: currently search in the same database instance vertices and edges but change later to column families
// Out edges of one vertex_acc with gid src_gid have following format in the RocksDB:
// The VertexAccessor's out edge with gid src_gid has the following format in the RocksDB:
// src_gid | other_vertex_gid | 0 | ...
// other_vertex_gid | src_gid | 1 | ...
// we use the firt way since this should be possible to optimize using Bloom filters and prefix search
@ -137,7 +112,7 @@ class RocksDBStorage {
return out_edges;
}
// InEdges of one vertex_acc with GID "dest_gid" have following format in the RocksDB:
// The VertexAccessor's out edge with gid src_gid has the following format in the RocksDB:
// other_vertex_gid | dest_gid | 0 | ...
// dest_gid | other_verte_gid | 1 | ...
// we use the second way since this should be possible to optimize using Bloom filters and prefix search.
@ -155,9 +130,123 @@ class RocksDBStorage {
return in_edges;
}
// Read all vertices stored in the database by a label
// TODO: rewrite the code with some lambda operations
// certainly can be a bit optimized so that new object isn't created if can be discarded
// TODO: how will we handle new vertex creation
// STORAGE ACCESSOR FUNCTIONALITIES
// -----------------------------------------------------------
// TODO: how will we handle new edge creation
/// @return Accessor to the deleted edge if a deletion took place, std::nullopt otherwise.
/// Delete two edge entries since on edge is represented on a two-fold level.
/// Edges are deleted from logical partition containing edges.
std::optional<query::EdgeAccessor> DeleteEdge(const query::EdgeAccessor &edge_acc) {
auto [src_dest_key, dest_src_key] = SerializeEdge(edge_acc);
if (!CheckRocksDBStatus(db_->Delete(rocksdb::WriteOptions(), edge_chandle, src_dest_key)) ||
!CheckRocksDBStatus(db_->Delete(rocksdb::WriteOptions(), edge_chandle, dest_src_key))) {
return std::nullopt;
}
return edge_acc;
}
/// Helper function, not used in the real accessor.
std::optional<std::vector<query::EdgeAccessor>> DeleteEdges(const auto &edge_accessors) {
std::vector<query::EdgeAccessor> edge_accs;
for (auto &&it : edge_accessors) {
if (const auto deleted_edge_res = DeleteEdge(it); !deleted_edge_res.has_value()) {
return std::nullopt;
}
edge_accs.push_back(it);
}
return edge_accs;
}
/// @return A reference to the deleted vertex accessor if deleted, otherwise std::nullopt.
/// Delete vertex from logical partition containing vertices.
std::optional<query::VertexAccessor> DeleteVertex(const query::VertexAccessor &vertex_acc) {
if (!CheckRocksDBStatus(db_->Delete(rocksdb::WriteOptions(), vertex_chandle, SerializeVertex(vertex_acc)))) {
return std::nullopt;
}
return vertex_acc;
}
/// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise.
/// Delete vertex from logical partition containing vertices.
/// For each edge delete two key-value entries from logical partition containing edges.
std::optional<std::pair<query::VertexAccessor, std::vector<query::EdgeAccessor>>> DetachDeleteVertex(
const query::VertexAccessor &vertex_acc) {
auto del_vertex = DeleteVertex(vertex_acc);
if (!del_vertex.has_value()) {
return std::nullopt;
}
auto out_edges = vertex_acc.OutEdges(storage::View::OLD);
auto in_edges = vertex_acc.InEdges(storage::View::OLD);
if (out_edges.HasError() || in_edges.HasError()) {
return std::nullopt;
}
if (auto del_edges = DeleteEdges(*out_edges), del_in_edges = DeleteEdges(*in_edges);
del_edges.has_value() && del_in_edges.has_value()) {
del_edges->insert(del_in_edges->end(), std::make_move_iterator(del_in_edges->begin()),
std::make_move_iterator(del_in_edges->end()));
return std::make_pair(*del_vertex, *del_edges);
}
return std::nullopt;
}
// STORING
// -----------------------------------------------------------
/// Serialize and store in-memory vertex to the disk.
/// TODO: write the exact format
/// Properties are serialized as the value
void StoreVertex(const query::VertexAccessor &vertex_acc) {
AssertRocksDBStatus(db_->Put(rocksdb::WriteOptions(), vertex_chandle, SerializeVertex(vertex_acc),
SerializeProperties(vertex_acc.PropertyStore())));
}
/// TODO: remove config being sent as the parameter. Later will be added as the part of the storage accessor as in the
/// memory version. for now assume that we always operate with edges having properties
void StoreEdge(const query::EdgeAccessor &edge_acc) {
auto [src_dest_key, dest_src_key] = SerializeEdge(edge_acc);
const std::string value = SerializeProperties(edge_acc.PropertyStore());
AssertRocksDBStatus(db_->Put(rocksdb::WriteOptions(), edge_chandle, src_dest_key, value));
AssertRocksDBStatus(db_->Put(rocksdb::WriteOptions(), edge_chandle, dest_src_key, value));
}
// UPDATE PART
// -----------------------------------------------------------
/// Clear all entries from the database.
/// TODO: check if this deletes all entries, or you also need to specify handle here
/// TODO: This will not be needed in the production code and can possibly removed in testing
void Clear() {
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
db_->Delete(rocksdb::WriteOptions(), it->key().ToString());
}
delete it;
}
// READ PART
// -----------------------------------------------------------
/// TODO: if the need comes for using also a GID object, use std::variant
/// This should again be changed when we have mulitple same vertices
std::optional<query::VertexAccessor> FindVertex(const std::string_view gid, query::DbAccessor &dba) {
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
const std::string_view key = it->key().ToStringView();
if (const auto vertex_parts = utils::Split(key, "|"); vertex_parts[1] == gid) {
return DeserializeVertex(key, it->value().ToStringView(), dba);
}
}
delete it;
return std::nullopt;
}
/// Read all vertices stored in the database by a label
/// TODO: rewrite the code with some lambda operations
/// certainly can be a bit optimized so that new object isn't created if can be discarded
std::vector<query::VertexAccessor> Vertices(query::DbAccessor &dba, const storage::LabelId &label_id) {
std::vector<query::VertexAccessor> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle);
@ -171,9 +260,9 @@ class RocksDBStorage {
return vertices;
}
// TODO: rewrite the code with means of lambda operation
// TODO: we need to this, otherwise we will have to change a lot of things as we are dealing on the low level
// certainly can be a bit optimized so that new object isn't created if can be discarded
/// TODO: rewrite the code with means of lambda operation
/// TODO: we need to this, otherwise we will have to change a lot of things as we are dealing on the low level
/// certainly can be a bit optimized so that new object isn't created if can be discarded
std::vector<query::VertexAccessor> Vertices(query::DbAccessor &dba, const storage::PropertyId &property_id,
const storage::PropertyValue &prop_value) {
std::vector<query::VertexAccessor> vertices;
@ -213,8 +302,8 @@ class RocksDBStorage {
return ser_labels;
}
// TODO: write a documentation for the method
// TODO: add deserialization equivalent method
/// TODO: write a documentation for the method
/// TODO: add deserialization equivalent method
inline std::string SerializeIdType(const auto &id) { return std::to_string(id.AsUint()); }
std::string SerializeVertex(const query::VertexAccessor &vertex_acc) {
@ -256,8 +345,8 @@ class RocksDBStorage {
return impl;
}
// Serializes edge accessor to obtain a key for the key-value store
// returns two string because there will be two keys since edge is stored in both directions
/// Serializes edge accessor to obtain a key for the key-value store
/// returns two string because there will be two keys since edge is stored in both directions
std::pair<std::string, std::string> SerializeEdge(const query::EdgeAccessor &edge_acc) {
// Serialized objects
auto from_gid = SerializeIdType(edge_acc.From().Gid());
@ -293,8 +382,8 @@ class RocksDBStorage {
},
edge_parts);
// load vertex accessors
auto from_acc = Vertex(from_gid, dba);
auto to_acc = Vertex(to_gid, dba);
auto from_acc = FindVertex(from_gid, dba);
auto to_acc = FindVertex(to_gid, dba);
if (!from_acc.has_value() || !to_acc.has_value()) {
throw utils::BasicException("Non-existing vertices during edge deserialization");
}

View File

@ -249,53 +249,74 @@ TEST_F(RocksDBStorageTest, SerializeEdge) {
*src_out_edge.Properties(memgraph::storage::View::OLD));
}
TEST_F(RocksDBStorageTest, SerializeVertexGIDProperties) {
// serializes vertex's gid, multiple labels and properties
TEST_F(RocksDBStorageTest, DeleteVertex) {
auto storage_dba = storage.Access(memgraph::storage::IsolationLevel::READ_UNCOMMITTED);
memgraph::query::DbAccessor dba(&storage_dba);
// prepare labels
std::vector<memgraph::storage::LabelId> label_ids{dba.NameToLabel("Player"), dba.NameToLabel("Person"),
dba.NameToLabel("Ball")};
// prepare properties
std::map<memgraph::storage::PropertyId, memgraph::storage::PropertyValue> properties;
properties.emplace(dba.NameToProperty("name"), memgraph::storage::PropertyValue("disk"));
properties.emplace(dba.NameToProperty("memory"), memgraph::storage::PropertyValue("1TB"));
properties.emplace(dba.NameToProperty("price"), memgraph::storage::PropertyValue(1000.21));
// gids
std::unordered_set<uint64_t> gids;
for (int i = 0; i < 5; ++i) {
gids.insert(i);
auto impl = dba.InsertVertex();
impl.SetGid(memgraph::storage::Gid::FromUint(i));
impl.AddLabel(label_ids[i % 3]);
impl.AddLabel(label_ids[(i + 1) % 3]);
memgraph::query::MultiPropsInitChecked(&impl, properties);
db.StoreVertex(impl);
}
// load vertices from disk
auto loaded_vertices = db.Vertices(dba);
ASSERT_EQ(loaded_vertices.size(), 5);
for (const auto &vertex_acc : loaded_vertices) {
ASSERT_TRUE(gids.contains(vertex_acc.Gid().AsUint()));
// labels
auto labels = vertex_acc.Labels(memgraph::storage::View::OLD);
ASSERT_EQ(labels->size(), 2);
ASSERT_TRUE(std::all_of(labels->begin(), labels->end(), [&label_ids](const auto &label_id) {
return std::find(label_ids.begin(), label_ids.end(), label_id) != label_ids.end();
}));
// check properties
auto props = vertex_acc.Properties(memgraph::storage::View::OLD);
ASSERT_FALSE(props.HasError());
auto prop_name = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("name"));
auto prop_memory = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("memory"));
auto prop_price = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("price"));
auto prop_unexisting = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("random"));
ASSERT_TRUE(prop_name->IsString());
ASSERT_EQ(prop_name->ValueString(), "disk");
ASSERT_TRUE(prop_memory->IsString());
ASSERT_EQ(prop_memory->ValueString(), "1TB");
ASSERT_TRUE(prop_price->IsDouble());
ASSERT_DOUBLE_EQ(prop_price->ValueDouble(), 1000.21);
ASSERT_TRUE(prop_unexisting->IsNull());
}
properties.emplace(dba.NameToProperty("sum"), memgraph::storage::PropertyValue("2TB"));
properties.emplace(dba.NameToProperty("same_type"), memgraph::storage::PropertyValue(true));
properties.emplace(dba.NameToProperty("cluster_price"), memgraph::storage::PropertyValue(2000.42));
// create vertex
auto impl = dba.InsertVertex();
impl.AddLabel(dba.NameToLabel("Player"));
memgraph::query::MultiPropsInitChecked(&impl, properties);
db.StoreVertex(impl);
// find vertex should work now
ASSERT_TRUE(db.FindVertex(std::to_string(impl.Gid().AsUint()), dba).has_value());
// RocksDB doesn't physically delete entry so deletion will pass two times
ASSERT_TRUE(db.DeleteVertex(impl).has_value());
ASSERT_TRUE(db.DeleteVertex(impl).has_value());
// second time you shouldn't be able to find the vertex
ASSERT_FALSE(db.FindVertex(std::to_string(impl.Gid().AsUint()), dba).has_value());
}
// TEST_F(RocksDBStorageTest, SerializeVertexGIDProperties) {
// // serializes vertex's gid, multiple labels and properties
// auto storage_dba = storage.Access(memgraph::storage::IsolationLevel::READ_UNCOMMITTED);
// memgraph::query::DbAccessor dba(&storage_dba);
// // prepare labels
// std::vector<memgraph::storage::LabelId> label_ids{dba.NameToLabel("Player"), dba.NameToLabel("Person"),
// dba.NameToLabel("Ball")};
// // prepare properties
// std::map<memgraph::storage::PropertyId, memgraph::storage::PropertyValue> properties;
// properties.emplace(dba.NameToProperty("name"), memgraph::storage::PropertyValue("disk"));
// properties.emplace(dba.NameToProperty("memory"), memgraph::storage::PropertyValue("1TB"));
// properties.emplace(dba.NameToProperty("price"), memgraph::storage::PropertyValue(1000.21));
// // gids
// std::unordered_set<uint64_t> gids;
// for (int i = 0; i < 5; ++i) {
// gids.insert(i);
// auto impl = dba.InsertVertex();
// impl.SetGid(memgraph::storage::Gid::FromUint(i));
// impl.AddLabel(label_ids[i % 3]);
// impl.AddLabel(label_ids[(i + 1) % 3]);
// memgraph::query::MultiPropsInitChecked(&impl, properties);
// db.StoreVertex(impl);
// }
// // load vertices from disk
// auto loaded_vertices = db.Vertices(dba);
// ASSERT_EQ(loaded_vertices.size(), 5);
// for (const auto &vertex_acc : loaded_vertices) {
// ASSERT_TRUE(gids.contains(vertex_acc.Gid().AsUint()));
// // labels
// auto labels = vertex_acc.Labels(memgraph::storage::View::OLD);
// ASSERT_EQ(labels->size(), 2);
// ASSERT_TRUE(std::all_of(labels->begin(), labels->end(), [&label_ids](const auto &label_id) {
// return std::find(label_ids.begin(), label_ids.end(), label_id) != label_ids.end();
// }));
// // check properties
// auto props = vertex_acc.Properties(memgraph::storage::View::OLD);
// ASSERT_FALSE(props.HasError());
// auto prop_name = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("name"));
// auto prop_memory = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("memory"));
// auto prop_price = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("price"));
// auto prop_unexisting = vertex_acc.GetProperty(memgraph::storage::View::OLD, dba.NameToProperty("random"));
// ASSERT_TRUE(prop_name->IsString());
// ASSERT_EQ(prop_name->ValueString(), "disk");
// ASSERT_TRUE(prop_memory->IsString());
// ASSERT_EQ(prop_memory->ValueString(), "1TB");
// ASSERT_TRUE(prop_price->IsDouble());
// ASSERT_DOUBLE_EQ(prop_price->ValueDouble(), 1000.21);
// ASSERT_TRUE(prop_unexisting->IsNull());
// }
// }