Code edits

This commit is contained in:
Andi Skrgat 2023-04-12 13:43:47 +02:00
parent c441b7de0b
commit 876359f4b6
6 changed files with 91 additions and 253 deletions

View File

@ -99,7 +99,6 @@
#include "audit/log.hpp"
#endif
// Disk storage includes
#include "storage/rocks/serialization.hpp"
#include "storage/rocks/storage.hpp"
constexpr const char *kMgUser = "MEMGRAPH_USER";
@ -775,8 +774,8 @@ int main(int argc, char **argv) {
// libstd.
auto gil = memgraph::py::EnsureGIL();
// NOLINTNEXTLINE(hicpp-signed-bitwise)
// auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND);
auto *flag = PyLong_FromLong(RTLD_NOW);
auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND);
// auto *flag = PyLong_FromLong(RTLD_NOW);
auto *setdl = PySys_GetObject("setdlopenflags");
MG_ASSERT(setdl);
auto *arg = PyTuple_New(1);

View File

@ -1,6 +1,5 @@
set(storage_rocks_src_files
loopback.hpp
serialization.hpp)
storage.hpp)
find_package(Threads REQUIRED)
find_package(gflags REQUIRED)

View File

@ -1,72 +0,0 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
#include <iostream>
#include <memory>
#include <vector>
#include <fmt/format.h>
#include "slk/streams.hpp"
#include "utils/logging.hpp"
namespace memgraph::slk {
/// Class used for basic SLK use-cases.
/// It creates a `memgraph::slk::Builder` that can be written to. After you
/// have written the data to the builder, you can get a `memgraph::slk::Reader`
/// and try to decode the encoded data.
class Loopback {
public:
~Loopback() {
MG_ASSERT(builder_, "You haven't created a builder!");
MG_ASSERT(reader_, "You haven't created a reader!");
reader_->Finalize();
}
memgraph::slk::Builder *GetBuilder() {
MG_ASSERT(!builder_, "You have already allocated a builder!");
builder_ = std::make_unique<memgraph::slk::Builder>(
[this](const uint8_t *data, size_t size, bool have_more) { Write(data, size, have_more); });
return builder_.get();
}
memgraph::slk::Reader *GetReader() {
MG_ASSERT(builder_, "You must first get a builder before getting a reader!");
MG_ASSERT(!reader_, "You have already allocated a reader!");
builder_->Finalize();
auto ret = memgraph::slk::CheckStreamComplete(data_.data(), data_.size());
MG_ASSERT(ret.status == memgraph::slk::StreamStatus::COMPLETE);
MG_ASSERT(ret.stream_size == data_.size());
size_ = ret.encoded_data_size;
reader_ = std::make_unique<memgraph::slk::Reader>(data_.data(), data_.size());
return reader_.get();
}
size_t size() { return size_; }
private:
void Write(const uint8_t *data, size_t size, bool have_more) {
for (size_t i = 0; i < size; ++i) {
data_.push_back(data[i]);
}
}
std::vector<uint8_t> data_;
std::unique_ptr<memgraph::slk::Builder> builder_;
std::unique_ptr<memgraph::slk::Reader> reader_;
size_t size_{0};
};
} // namespace memgraph::slk

View File

@ -1,56 +0,0 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <numeric>
#include <optional>
#include "query/db_accessor.hpp"
#include "slk/serialization.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage/v2/view.hpp"
namespace memgraph::slk {
class Encoder final {
public:
explicit Encoder(slk::Builder *builder) : builder_(builder) {}
// too serious, will be used later in the future probably
void SerializeVertex(const query::VertexAccessor &vertex_acc) {
// storage::LabelId label = vertex_acc.Labels(storage::View::OLD)->at(0);
// int num_in_edges = *vertex_acc.InDegree(storage::View::OLD);
// int num_out_edges = *vertex_acc.OutDegree(storage::View::OLD);
// slk::Save(vertex_acc.Gid().AsUint(), builder_);
}
private:
slk::Builder *builder_;
};
class Decoder final {
public:
explicit Decoder(slk::Reader *reader) : reader_(reader) {}
storage::Vertex ReadVertex() {
int64_t id = 1234;
slk::Load(&id, reader_);
return {storage::Gid::FromUint(id), nullptr};
}
private:
slk::Reader *reader_;
};
} // namespace memgraph::slk

View File

@ -15,46 +15,24 @@
#include <rocksdb/iterator.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
#include <boost/filesystem.hpp>
#include <iterator>
#include <numeric>
#include <optional>
#include <stdexcept>
#include <string>
#include <string_view>
#include <tuple>
#include <vector>
#include "query/common.hpp"
#include "query/db_accessor.hpp"
#include "spdlog/spdlog.h"
#include "storage/rocks/loopback.hpp"
#include "storage/rocks/serialization.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/property_store.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/result.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage/v2/view.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
#include "utils/file.hpp"
#include "utils/logging.hpp"
#include "utils/string.hpp"
namespace memgraph::storage::rocks {
constexpr const char *vertexHandle = "vertex";
constexpr const char *edgeHandle = "edge";
constexpr const char *outEdgeDirection = "0";
constexpr const char *inEdgeDirection = "1";
/// Use it for operations that must successfully finish.
inline void AssertRocksDBStatus(const rocksdb::Status &status) {
MG_ASSERT(status.ok(), "rocksdb: {}", status.ToString());
}
inline bool CheckRocksDBStatus(const rocksdb::Status &status) {
if (!status.ok()) {
if (!status.ok()) [[unlikely]] {
spdlog::error("rocksdb: {}", status.ToString());
}
return status.ok();
@ -68,11 +46,14 @@ class RocksDBStorage {
std::filesystem::path rocksdb_path = "./rocks_experiment_unit";
MG_ASSERT(utils::EnsureDir(rocksdb_path), "Unable to create storage folder on the disk.");
AssertRocksDBStatus(rocksdb::DB::Open(options_, rocksdb_path, &db_));
AssertRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "vertex", &vertex_chandle));
AssertRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "edge", &edge_chandle));
AssertRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), vertexHandle, &vertex_chandle));
AssertRocksDBStatus(db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), edgeHandle, &edge_chandle));
}
// TODO: explicitly delete other constructors
RocksDBStorage(const RocksDBStorage &) = delete;
RocksDBStorage &operator=(const RocksDBStorage &) = delete;
RocksDBStorage &operator=(RocksDBStorage &&) = delete;
RocksDBStorage(RocksDBStorage &&) = delete;
~RocksDBStorage() {
AssertRocksDBStatus(db_->DropColumnFamily(vertex_chandle));
@ -86,61 +67,61 @@ class RocksDBStorage {
// EDGE ACCESSOR FUNCTIONALITIES
// -----------------------------------------------------------
// fetch the edge's source vertex by its GID
/// fetch the edge's source vertex by its GID
std::optional<query::VertexAccessor> FromVertex(const query::EdgeAccessor &edge_acc, query::DbAccessor &dba) {
return FindVertex(SerializeIdType(edge_acc.From().Gid()), dba);
}
// fetch the edge's destination vertex by its GID
/// fetch the edge's destination vertex by its GID
std::optional<query::VertexAccessor> ToVertex(const query::EdgeAccessor &edge_acc, query::DbAccessor &dba) {
return FindVertex(SerializeIdType(edge_acc.To().Gid()), dba);
}
// VERTEX ACCESSOR FUNCTIONALITIES
// ------------------------------------------------------------
/// VERTEX ACCESSOR FUNCTIONALITIES
/// ------------------------------------------------------------
// The VertexAccessor's out edge with gid src_gid has the following format in the RocksDB:
// src_gid | other_vertex_gid | 0 | ...
// other_vertex_gid | src_gid | 1 | ...
// we use the firt way since this should be possible to optimize using Bloom filters and prefix search
/// The VertexAccessor's out edge with gid src_gid has the following format in the RocksDB:
/// src_gid | other_vertex_gid | 0 | ...
/// other_vertex_gid | src_gid | 1 | ...
/// We use the firt way since this should be possible to optimize using Bloom filters and prefix search
std::vector<query::EdgeAccessor> OutEdges(const query::VertexAccessor &vertex_acc, query::DbAccessor &dba) {
const auto vertex_acc_gid = SerializeIdType(vertex_acc.Gid());
std::vector<query::EdgeAccessor> out_edges;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), edge_chandle);
auto it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions(), edge_chandle));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
const std::string_view key = it->key().ToStringView();
const auto vertex_parts = utils::Split(key, "|");
if (vertex_parts[0] == SerializeIdType(vertex_acc.Gid()) && vertex_parts[2] == "0") {
if (vertex_parts[0] == vertex_acc_gid && vertex_parts[2] == outEdgeDirection) {
out_edges.push_back(DeserializeEdge(key, it->value().ToStringView(), dba));
}
}
delete it;
return out_edges;
}
// The VertexAccessor's out edge with gid src_gid has the following format in the RocksDB:
// other_vertex_gid | dest_gid | 0 | ...
// dest_gid | other_verte_gid | 1 | ...
// we use the second way since this should be possible to optimize using Bloom filters and prefix search.
/// The VertexAccessor's out edge with gid src_gid has the following format in the RocksDB:
/// other_vertex_gid | dest_gid | 0 | ...
/// dest_gid | other_verte_gid | 1 | ...
/// we use the second way since this should be possible to optimize using Bloom filters and prefix search.
std::vector<query::EdgeAccessor> InEdges(const query::VertexAccessor &vertex_acc, query::DbAccessor &dba) {
const auto vertex_acc_gid = SerializeIdType(vertex_acc.Gid());
std::vector<query::EdgeAccessor> in_edges;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), edge_chandle);
auto it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions(), edge_chandle));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
const std::string_view key = it->key().ToStringView();
const auto vertex_parts = utils::Split(key, "|");
if (vertex_parts[0] == SerializeIdType(vertex_acc.Gid()) && vertex_parts[2] == "1") {
if (vertex_parts[0] == vertex_acc_gid && vertex_parts[2] == inEdgeDirection) {
in_edges.push_back(DeserializeEdge(key, it->value().ToStringView(), dba));
}
}
delete it;
return in_edges;
}
// TODO: how will we handle new vertex creation
/// TODO: how will we handle new vertex creation
// STORAGE ACCESSOR FUNCTIONALITIES
// -----------------------------------------------------------
/// STORAGE ACCESSOR FUNCTIONALITIES
/// -----------------------------------------------------------
// TODO: how will we handle new edge creation
/// TODO: how will we handle new edge creation
/// @return Accessor to the deleted edge if a deletion took place, std::nullopt otherwise.
/// Delete two edge entries since on edge is represented on a two-fold level.
@ -198,19 +179,17 @@ class RocksDBStorage {
return std::nullopt;
}
// STORING
// -----------------------------------------------------------
/// STORING
/// -----------------------------------------------------------
/// Serialize and store in-memory vertex to the disk.
/// TODO: write the exact format
/// Properties are serialized as the value
void StoreVertex(const query::VertexAccessor &vertex_acc) {
AssertRocksDBStatus(db_->Put(rocksdb::WriteOptions(), vertex_chandle, SerializeVertex(vertex_acc),
SerializeProperties(vertex_acc.PropertyStore())));
}
/// TODO: remove config being sent as the parameter. Later will be added as the part of the storage accessor as in the
/// memory version. for now assume that we always operate with edges having properties
/// Store edge as two key-value entries in the RocksDB.
void StoreEdge(const query::EdgeAccessor &edge_acc) {
auto [src_dest_key, dest_src_key] = SerializeEdge(edge_acc);
const std::string value = SerializeProperties(edge_acc.PropertyStore());
@ -218,27 +197,26 @@ class RocksDBStorage {
AssertRocksDBStatus(db_->Put(rocksdb::WriteOptions(), edge_chandle, dest_src_key, value));
}
// UPDATE PART
// -----------------------------------------------------------
/// UPDATE PART
/// -----------------------------------------------------------
/// Clear all entries from the database.
/// TODO: check if this deletes all entries, or you also need to specify handle here
/// TODO: This will not be needed in the production code and can possibly removed in testing
void Clear() {
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
auto it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
db_->Delete(rocksdb::WriteOptions(), it->key().ToString());
}
delete it;
}
// READ PART
// -----------------------------------------------------------
/// READ PART
/// -----------------------------------------------------------
/// TODO: if the need comes for using also a GID object, use std::variant
/// This should again be changed when we have mulitple same vertices
std::optional<query::VertexAccessor> FindVertex(const std::string_view gid, query::DbAccessor &dba) {
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle);
auto it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle));
std::optional<query::VertexAccessor> result = {};
for (it->SeekToFirst(); it->Valid(); it->Next()) {
const auto &key = it->key().ToString();
@ -247,58 +225,51 @@ class RocksDBStorage {
break;
}
}
delete it;
return result;
}
/// Read all vertices stored in the database by a label
/// TODO: rewrite the code with some lambda operations
/// certainly can be a bit optimized so that new object isn't created if can be discarded
/// Get all vertices by a label.
std::vector<query::VertexAccessor> Vertices(query::DbAccessor &dba, const storage::LabelId &label_id) {
std::vector<query::VertexAccessor> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto vertex = DeserializeVertex(it->key().ToStringView(), it->value().ToStringView(), dba);
if (const auto res = vertex.HasLabel(storage::View::OLD, label_id); !res.HasError() && *res) {
vertices.push_back(vertex);
}
}
delete it;
return vertices;
return Vertices(dba, [label_id](const auto &vertex) {
const auto res = vertex.HasLabel(storage::View::OLD, label_id);
return !res.HasError() && *res;
});
}
/// TODO: unique ptr on iterators
/// TODO: rewrite the code with means of lambda operation
/// TODO: we need to this, otherwise we will have to change a lot of things as we are dealing on the low level
/// certainly can be a bit optimized so that new object isn't created if can be discarded
/// Read all vertices stored in the database by a property
std::vector<query::VertexAccessor> Vertices(query::DbAccessor &dba, const storage::PropertyId &property_id,
const storage::PropertyValue &prop_value) {
const storage::PropertyValue &property_value) {
return Vertices(dba, [property_id, property_value](const auto &vertex) {
const auto res = vertex.GetProperty(storage::View::OLD, property_id);
return !res.HasError() && *res == property_value;
});
}
/// Get all vertices.
std::vector<query::VertexAccessor> Vertices(query::DbAccessor &dba) {
return Vertices(dba, [](const auto & /*vertex*/) { return true; });
}
/// Read all vertices stored in the database and filter them by a lambda function.
std::vector<query::VertexAccessor> Vertices(query::DbAccessor &dba, const auto &vertex_filter) {
std::vector<query::VertexAccessor> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle);
auto it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto vertex = DeserializeVertex(it->key().ToStringView(), it->value().ToStringView(), dba);
if (const auto res = vertex.GetProperty(storage::View::OLD, property_id); !res.HasError() && *res == prop_value) {
if (vertex_filter(vertex)) {
vertices.push_back(vertex);
}
}
delete it;
return vertices;
}
// Read all vertices stored in the database.
std::vector<query::VertexAccessor> Vertices(query::DbAccessor &dba) {
std::vector<query::VertexAccessor> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions(), vertex_chandle);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
vertices.push_back(DeserializeVertex(it->key().ToStringView(), it->value().ToStringView(), dba));
}
delete it;
return vertices;
}
protected:
private:
/// Serialization of properties is done by saving the property store buffer
/// If the data is stored in the local buffer of the property store, data from the buffer is copied to the string
/// If the data is stored in some external buffer, the data is read from that location and copied to the string
inline std::string SerializeProperties(const auto &&properties) { return properties; }
/// Serialize labels delimitied by | to string
std::string SerializeLabels(const auto &&labels) {
if (labels.HasError() || (*labels).empty()) {
return "";
@ -310,19 +281,22 @@ class RocksDBStorage {
return ser_labels;
}
/// TODO: write a documentation for the method
/// TODO: add deserialization equivalent method
/// Serializes id type to string
inline std::string SerializeIdType(const auto &id) { return std::to_string(id.AsUint()); }
/// Serialize vertex to string
/// The format: | label1,label2,label3 | gid
std::string SerializeVertex(const query::VertexAccessor &vertex_acc) {
std::string result = SerializeLabels(vertex_acc.Labels(storage::View::OLD)) + "|";
result += SerializeIdType(vertex_acc.Gid());
return result;
}
/// Deserialize vertex from string
/// Properties are read from value and set to the vertex later
query::VertexAccessor DeserializeVertex(const std::string_view key, const std::string_view value,
query::DbAccessor &dba) {
// Create vertex
/// Create vertex
auto impl = dba.InsertVertex();
spdlog::info("Key to deserialize: {}", key);
const auto vertex_parts = utils::Split(key, "|");
@ -346,15 +320,14 @@ class RocksDBStorage {
}
}
}
// deserialize gid
impl.SetGid(storage::Gid::FromUint(std::stoull(vertex_parts[1])));
// deserialize properties
impl.SetPropertyStore(value);
return impl;
}
/// Serializes edge accessor to obtain a key for the key-value store
/// returns two string because there will be two keys since edge is stored in both directions
/// Serializes edge accessor to obtain a key for the key-value store.
/// @return two strings because there will be two keys since edge is stored in both directions.
// | from_gid | to_gid | direction | edge_type | edge_gid
std::pair<std::string, std::string> SerializeEdge(const query::EdgeAccessor &edge_acc) {
// Serialized objects
auto from_gid = SerializeIdType(edge_acc.From().Gid());
@ -364,19 +337,20 @@ class RocksDBStorage {
// source->destination key
std::string src_dest_key = from_gid + "|";
src_dest_key += to_gid + "|";
src_dest_key += "0|";
src_dest_key += edge_type + "|";
src_dest_key += outEdgeDirection;
src_dest_key += "|" + edge_type + "|";
src_dest_key += edge_gid;
// destination->source key
std::string dest_src_key = to_gid + "|";
dest_src_key += from_gid + "|";
dest_src_key += "1|";
dest_src_key += edge_type + "|";
dest_src_key += inEdgeDirection;
dest_src_key += "|" + edge_type + "|";
dest_src_key += edge_gid;
return {src_dest_key, dest_src_key};
}
// deserialize edge from the given key-value
/// Deserialize edge from the given key-value.
/// Properties are read from value and set to the edge later.
query::EdgeAccessor DeserializeEdge(const std::string_view key, const std::string_view value,
query::DbAccessor &dba) {
const auto edge_parts = utils::Split(key, "|");
@ -395,19 +369,15 @@ class RocksDBStorage {
if (!from_acc.has_value() || !to_acc.has_value()) {
throw utils::BasicException("Non-existing vertices during edge deserialization");
}
// TODO: remove to deserialization edge type id method
const auto edge_type_id = storage::EdgeTypeId::FromUint(std::stoull(edge_parts[3]));
// TODO: remove to deserialization edge type id method
const auto edge_gid = storage::Gid::FromUint(std::stoull(edge_parts[4]));
auto maybe_edge = dba.InsertEdge(&*from_acc, &*to_acc, edge_type_id);
MG_ASSERT(maybe_edge.HasValue());
// in the new storage API, setting gid must be done atomically
maybe_edge->SetGid(edge_gid);
maybe_edge->SetGid(storage::Gid::FromUint(std::stoull(edge_parts[4])));
maybe_edge->SetPropertyStore(value);
return *maybe_edge;
}
private:
rocksdb::Options options_;
rocksdb::DB *db_;
rocksdb::ColumnFamilyHandle *vertex_chandle = nullptr;

View File

@ -323,8 +323,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
properties = vertex_->properties.Properties();
delta = vertex_->delta;
}
return std::move(properties);
/*
// return std::move(properties);
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &properties](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
@ -362,7 +361,6 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return std::move(properties);
*/
}
Result<std::vector<EdgeAccessor>> VertexAccessor::InEdges(View view, const std::vector<EdgeTypeId> &edge_types,