This commit is contained in:
Andi Skrgat 2023-04-03 11:08:47 +02:00
parent 7503b304f2
commit f7c1923ab2
4 changed files with 89 additions and 18 deletions

View File

@ -1131,7 +1131,11 @@ using RWType = plan::ReadWriteTypeChecker::RWType;
InterpreterContext::InterpreterContext(storage::Storage *db, storage::rocks::RocksDBStorage *disk_db,
const InterpreterConfig config, const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory / "streams"} {}
: db(db),
disk_db(disk_db),
trigger_store(data_directory / "triggers"),
config(config),
streams{this, data_directory / "streams"} {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");

View File

@ -491,11 +491,8 @@ UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
auto vertices = [this](Frame &, ExecutionContext &context) {
auto *db = context.db_accessor;
// const storage::Vertex disk_vertex = context.disk_db->Vertices();
auto vertices = std::make_optional(db->Vertices(view_));
std::for_each(vertices->begin(), vertices->end(),
[disk_db = context.disk_db](const auto &vertex) { disk_db->StoreVertex(vertex); });
// spdlog::debug("Vertex loaded in the RocksDB {}", disk_vertex.gid.AsUint());
const std::vector<storage::Vertex> disk_vertices = context.disk_db->Vertices();
return vertices;
};
return MakeUniqueCursorPtr<ScanAllCursor<decltype(vertices)>>(mem, output_symbol_, input_->MakeCursor(mem), view_,

View File

@ -11,10 +11,12 @@
#pragma once
#include <numeric>
#include <optional>
#include "query/db_accessor.hpp"
#include "slk/serialization.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage/v2/view.hpp"
@ -25,11 +27,12 @@ class Encoder final {
public:
explicit Encoder(slk::Builder *builder) : builder_(builder) {}
void WriteVertex(const query::VertexAccessor &vertex_acc) {
storage::LabelId label = vertex_acc.Labels(storage::View::OLD)->at(0);
int num_in_edges = *vertex_acc.InDegree(storage::View::OLD);
int num_out_edges = *vertex_acc.OutDegree(storage::View::OLD);
slk::Save(vertex_acc.Gid().AsUint(), builder_);
// too serious, will be used later in the future probably
void SerializeVertex(const query::VertexAccessor &vertex_acc) {
// storage::LabelId label = vertex_acc.Labels(storage::View::OLD)->at(0);
// int num_in_edges = *vertex_acc.InDegree(storage::View::OLD);
// int num_out_edges = *vertex_acc.OutDegree(storage::View::OLD);
// slk::Save(vertex_acc.Gid().AsUint(), builder_);
}
private:

View File

@ -12,42 +12,109 @@
#pragma once
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
#include <string>
#include <string_view>
#include "query/db_accessor.hpp"
#include "spdlog/spdlog.h"
#include "storage/rocks/loopback.hpp"
#include "storage/rocks/serialization.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage/v2/view.hpp"
#include "utils/logging.hpp"
namespace memgraph::storage::rocks {
class RocksDBStorage {
public:
explicit RocksDBStorage() {
options_.create_if_missing = true;
// options_.create_if_missing = true;
options_.OptimizeLevelStyleCompaction();
rocksdb::Status status = rocksdb::DB::Open(options_, "~/rocksdb/", &db_);
std::filesystem::path rocksdb_path = "rocks_experiment";
if (!memgraph::utils::EnsureDir(rocksdb_path)) {
SPDLOG_ERROR("Unable to create storage folder on disk.");
// TODO: throw some error
}
rocksdb::Status status = rocksdb::DB::Open(options_, rocksdb_path, &db_);
MG_ASSERT(status.ok());
InsertStartingVertices();
}
~RocksDBStorage() {
rocksdb::Status status = db_->Close();
MG_ASSERT(status.ok());
delete db_;
}
// receives just a mock of vertex data needed for serializing vertex (used as a key in the RocksDB)
std::string SerializeVertex(const std::vector<std::string> labels, const storage::Gid &gid) {
std::string result;
std::string ser_Alabels =
std::accumulate(labels.begin(), labels.end(), result,
[](const std::string &join, const std::string &label) { return join + ":" + label; });
return result;
}
void InsertStartingVertices() {
std::vector<std::vector<std::string>> labels{{"Person", "Player"}, {"Person", "Referee"}, {"Ball"}};
for (int64_t i = 0; i < 10; ++i) {
std::string key = SerializeVertex(labels[i % 3], storage::Gid::FromUint(i + 1000));
rocksdb::Status status = db_->Put(rocksdb::WriteOptions(), key, "properties_" + std::to_string(i));
MG_ASSERT(status.ok());
}
}
/*
Reads all vertices stored in the database.
Read all vertices stored in the database.
*/
storage::Vertex Vertices() { return decoder_.ReadVertex(); }
std::vector<storage::Vertex> Vertices() {
std::vector<storage::Vertex> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
uint64_t key = std::stoull(it->key().ToString());
std::string value = it->value().ToString();
spdlog::debug("Key: {} Value: {}", key, value);
vertices.emplace_back(Gid::FromUint(key), nullptr);
}
return vertices;
}
/*
Read all vertices that have a specified label.
*/
std::vector<storage::Vertex> Vertices(const std::string_view label) {
std::vector<storage::Vertex> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
uint64_t key = std::stoull(it->key().ToString());
std::string value = it->value().ToString();
spdlog::debug("Key: {} Value: {}", key, value);
vertices.emplace_back(Gid::FromUint(key), nullptr);
}
return vertices;
}
/*
Serialize and store in-memory vertex to the disk.
*/
void StoreVertex(const query::VertexAccessor &vertex) { encoder_.WriteVertex(vertex); }
bool StoreVertex(query::VertexAccessor *vertex) {
rocksdb::Status status = db_->Put(rocksdb::WriteOptions(), std::to_string(vertex->Gid().AsUint()), "properties");
return status.ok();
}
private:
// rocksdb internals
rocksdb::Options options_;
rocksdb::DB *db_;
slk::Loopback loopback_;
slk::Encoder encoder_{loopback_.GetBuilder()};
slk::Decoder decoder_{loopback_.GetReader()};
// slk::Loopback loopback_;
// slk::Encoder encoder_{loopback_.GetBuilder()};
// slk::Decoder decoder_{loopback_.GetReader()};
};
} // namespace memgraph::storage::rocks