POC started

This commit is contained in:
Andi Skrgat 2023-03-31 15:00:08 +02:00
parent 398503da7a
commit 7503b304f2
10 changed files with 213 additions and 4 deletions

View File

@ -9,6 +9,7 @@ add_subdirectory(telemetry)
add_subdirectory(communication)
add_subdirectory(memory)
add_subdirectory(storage/v2)
add_subdirectory(storage/rocks)
add_subdirectory(integrations)
add_subdirectory(query)
add_subdirectory(glue)

View File

@ -98,6 +98,9 @@
#ifdef MG_ENTERPRISE
#include "audit/log.hpp"
#endif
// Disk storage includes
#include "storage/rocks/serialization.hpp"
#include "storage/rocks/storage.hpp"
constexpr const char *kMgUser = "MEMGRAPH_USER";
constexpr const char *kMgPassword = "MEMGRAPH_PASSWORD";
@ -907,10 +910,15 @@ int main(int argc, char **argv) {
}
db_config.durability.snapshot_interval = std::chrono::seconds(FLAGS_storage_snapshot_interval_sec);
}
// here in the future, a specific instantiation of storage type will be created
memgraph::storage::Storage db(db_config);
// for experiments, I will first bind together both storages to make sure we solve serialization correctly
// if we decide to use dynamic polymorphism, the concrete storage object should care about RocksDB details
memgraph::storage::rocks::RocksDBStorage disk_db;
memgraph::query::InterpreterContext interpreter_context{
&db,
&disk_db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
.replication_replica_check_frequency = std::chrono::seconds(FLAGS_replication_replica_check_frequency_sec),

View File

@ -20,6 +20,7 @@
#include "query/parameters.hpp"
#include "query/plan/profile.hpp"
#include "query/trigger.hpp"
#include "storage/rocks/storage.hpp"
#include "utils/async_timer.hpp"
namespace memgraph::query {
@ -86,6 +87,7 @@ struct ExecutionContext {
#ifdef MG_ENTERPRISE
std::unique_ptr<FineGrainedAuthChecker> auth_checker{nullptr};
#endif
storage::rocks::RocksDBStorage *disk_db;
};
static_assert(std::is_move_assignable_v<ExecutionContext>, "ExecutionContext must be move assignable!");

View File

@ -50,6 +50,7 @@
#include "query/stream/common.hpp"
#include "query/trigger.hpp"
#include "query/typed_value.hpp"
#include "storage/rocks/storage.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/property_value.hpp"
@ -1015,6 +1016,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory),
memory_limit_(memory_limit) {
ctx_.disk_db = interpreter_context->disk_db;
ctx_.db_accessor = dba;
ctx_.symbol_table = plan->symbol_table();
ctx_.evaluation_context.timestamp = QueryTimestamp();
@ -1127,8 +1129,8 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
using RWType = plan::ReadWriteTypeChecker::RWType;
} // namespace
InterpreterContext::InterpreterContext(storage::Storage *db, const InterpreterConfig config,
const std::filesystem::path &data_directory)
InterpreterContext::InterpreterContext(storage::Storage *db, storage::rocks::RocksDBStorage *disk_db,
const InterpreterConfig config, const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory / "streams"} {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {

View File

@ -32,6 +32,7 @@
#include "query/stream/streams.hpp"
#include "query/trigger.hpp"
#include "query/typed_value.hpp"
#include "storage/rocks/storage.hpp"
#include "storage/v2/isolation_level.hpp"
#include "utils/event_counter.hpp"
#include "utils/logging.hpp"
@ -208,10 +209,11 @@ class Interpreter;
*
*/
struct InterpreterContext {
explicit InterpreterContext(storage::Storage *db, InterpreterConfig config,
explicit InterpreterContext(storage::Storage *db, storage::rocks::RocksDBStorage *disk_db, InterpreterConfig config,
const std::filesystem::path &data_directory);
storage::Storage *db;
storage::rocks::RocksDBStorage *disk_db;
// ANTLR has singleton instance that is shared between threads. It is
// protected by locks inside of ANTLR. Unfortunately, they are not protected

View File

@ -491,7 +491,12 @@ UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
auto vertices = [this](Frame &, ExecutionContext &context) {
auto *db = context.db_accessor;
return std::make_optional(db->Vertices(view_));
// const storage::Vertex disk_vertex = context.disk_db->Vertices();
auto vertices = std::make_optional(db->Vertices(view_));
std::for_each(vertices->begin(), vertices->end(),
[disk_db = context.disk_db](const auto &vertex) { disk_db->StoreVertex(vertex); });
// spdlog::debug("Vertex loaded in the RocksDB {}", disk_vertex.gid.AsUint());
return vertices;
};
return MakeUniqueCursorPtr<ScanAllCursor<decltype(vertices)>>(mem, output_symbol_, input_->MakeCursor(mem), view_,
std::move(vertices), "ScanAll");

View File

@ -0,0 +1,11 @@
set(storage_rocks_src_files
loopback.hpp
serialization.hpp)
find_package(Threads REQUIRED)
find_package(gflags REQUIRED)
find_package(BZip2 REQUIRED)
find_package(ZLIB REQUIRED)
add_library(mg-storage-rocks STATIC ${storage_rocks_src_files})
target_link_libraries(mg-storage-rocks mg-utils Threads::Threads stdc++fs rocksdb BZip2::BZip2 ZLIB::ZLIB)

View File

@ -0,0 +1,72 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
#include <iostream>
#include <memory>
#include <vector>
#include <fmt/format.h>
#include "slk/streams.hpp"
#include "utils/logging.hpp"
namespace memgraph::slk {
/// Class used for basic SLK use-cases.
/// It creates a `memgraph::slk::Builder` that can be written to. After you
/// have written the data to the builder, you can get a `memgraph::slk::Reader`
/// and try to decode the encoded data.
class Loopback {
public:
~Loopback() {
MG_ASSERT(builder_, "You haven't created a builder!");
MG_ASSERT(reader_, "You haven't created a reader!");
reader_->Finalize();
}
memgraph::slk::Builder *GetBuilder() {
MG_ASSERT(!builder_, "You have already allocated a builder!");
builder_ = std::make_unique<memgraph::slk::Builder>(
[this](const uint8_t *data, size_t size, bool have_more) { Write(data, size, have_more); });
return builder_.get();
}
memgraph::slk::Reader *GetReader() {
MG_ASSERT(builder_, "You must first get a builder before getting a reader!");
MG_ASSERT(!reader_, "You have already allocated a reader!");
builder_->Finalize();
auto ret = memgraph::slk::CheckStreamComplete(data_.data(), data_.size());
MG_ASSERT(ret.status == memgraph::slk::StreamStatus::COMPLETE);
MG_ASSERT(ret.stream_size == data_.size());
size_ = ret.encoded_data_size;
reader_ = std::make_unique<memgraph::slk::Reader>(data_.data(), data_.size());
return reader_.get();
}
size_t size() { return size_; }
private:
void Write(const uint8_t *data, size_t size, bool have_more) {
for (size_t i = 0; i < size; ++i) {
data_.push_back(data[i]);
}
}
std::vector<uint8_t> data_;
std::unique_ptr<memgraph::slk::Builder> builder_;
std::unique_ptr<memgraph::slk::Reader> reader_;
size_t size_{0};
};
} // namespace memgraph::slk

View File

@ -0,0 +1,53 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <optional>
#include "query/db_accessor.hpp"
#include "slk/serialization.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage/v2/view.hpp"
namespace memgraph::slk {
class Encoder final {
public:
explicit Encoder(slk::Builder *builder) : builder_(builder) {}
void WriteVertex(const query::VertexAccessor &vertex_acc) {
storage::LabelId label = vertex_acc.Labels(storage::View::OLD)->at(0);
int num_in_edges = *vertex_acc.InDegree(storage::View::OLD);
int num_out_edges = *vertex_acc.OutDegree(storage::View::OLD);
slk::Save(vertex_acc.Gid().AsUint(), builder_);
}
private:
slk::Builder *builder_;
};
class Decoder final {
public:
explicit Decoder(slk::Reader *reader) : reader_(reader) {}
storage::Vertex ReadVertex() {
int64_t id = 1234;
slk::Load(&id, reader_);
return {storage::Gid::FromUint(id), nullptr};
}
private:
slk::Reader *reader_;
};
} // namespace memgraph::slk

View File

@ -0,0 +1,53 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
#include "storage/rocks/loopback.hpp"
#include "storage/rocks/serialization.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/vertex_accessor.hpp"
namespace memgraph::storage::rocks {
class RocksDBStorage {
public:
explicit RocksDBStorage() {
options_.create_if_missing = true;
options_.OptimizeLevelStyleCompaction();
rocksdb::Status status = rocksdb::DB::Open(options_, "~/rocksdb/", &db_);
MG_ASSERT(status.ok());
}
/*
Reads all vertices stored in the database.
*/
storage::Vertex Vertices() { return decoder_.ReadVertex(); }
/*
Serialize and store in-memory vertex to the disk.
*/
void StoreVertex(const query::VertexAccessor &vertex) { encoder_.WriteVertex(vertex); }
private:
// rocksdb internals
rocksdb::Options options_;
rocksdb::DB *db_;
slk::Loopback loopback_;
slk::Encoder encoder_{loopback_.GetBuilder()};
slk::Decoder decoder_{loopback_.GetReader()};
};
} // namespace memgraph::storage::rocks