Multiple labels serialization test cases

This commit is contained in:
Andi Skrgat 2023-04-04 11:12:58 +02:00
parent ceb3e67f8e
commit e193d4036e
15 changed files with 238 additions and 58 deletions

View File

@ -182,6 +182,8 @@ class VertexAccessor final {
storage::Gid Gid() const noexcept { return impl_.Gid(); }
void SetGid(storage::Gid gid) { impl_.SetGid(gid); }
bool operator==(const VertexAccessor &v) const noexcept {
static_assert(noexcept(impl_ == v.impl_));
return impl_ == v.impl_;

View File

@ -493,7 +493,6 @@ UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
auto vertices = [this](Frame &, ExecutionContext &context) {
auto *db = context.db_accessor;
auto vertices = std::make_optional(db->Vertices(view_));
const std::vector<storage::Vertex> disk_vertices = context.disk_db->Vertices();
return vertices;
};
return MakeUniqueCursorPtr<ScanAllCursor<decltype(vertices)>>(mem, output_symbol_, input_->MakeCursor(mem), view_,

View File

@ -15,6 +15,8 @@
#include <rocksdb/iterator.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
#include <numeric>
#include <stdexcept>
#include <string>
#include <string_view>
#include "query/db_accessor.hpp"
@ -27,23 +29,27 @@
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage/v2/view.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
#include "utils/logging.hpp"
#include "utils/string.hpp"
namespace memgraph::storage::rocks {
class RocksDBStorage {
public:
explicit RocksDBStorage() {
// options_.create_if_missing = true;
options_.create_if_missing = true;
options_.OptimizeLevelStyleCompaction();
std::filesystem::path rocksdb_path = "rocks_experiment";
std::filesystem::path rocksdb_path = "./rocks_experiment_test";
if (!memgraph::utils::EnsureDir(rocksdb_path)) {
SPDLOG_ERROR("Unable to create storage folder on disk.");
// TODO: throw some error
}
rocksdb::Status status = rocksdb::DB::Open(options_, rocksdb_path, &db_);
MG_ASSERT(status.ok());
InsertStartingVertices();
if (!status.ok()) {
spdlog::error(status.ToString());
}
}
~RocksDBStorage() {
@ -52,60 +58,93 @@ class RocksDBStorage {
delete db_;
}
// receives just a mock of vertex data needed for serializing vertex (used as a key in the RocksDB)
std::string SerializeVertex(const std::vector<std::string> labels, const storage::Gid &gid) {
std::string result;
std::string ser_Alabels =
std::accumulate(labels.begin(), labels.end(), result,
[](const std::string &join, const std::string &label) { return join + ":" + label; });
return result;
/*
Serialize and store in-memory vertex to the disk.
*/
bool StoreVertex(const query::VertexAccessor &vertex_acc) {
const std::string key = SerializeVertex(vertex_acc);
rocksdb::Status status = db_->Put(rocksdb::WriteOptions(), key, "properties");
return status.ok();
}
void InsertStartingVertices() {
std::vector<std::vector<std::string>> labels{{"Person", "Player"}, {"Person", "Referee"}, {"Ball"}};
for (int64_t i = 0; i < 10; ++i) {
std::string key = SerializeVertex(labels[i % 3], storage::Gid::FromUint(i + 1000));
rocksdb::Status status = db_->Put(rocksdb::WriteOptions(), key, "properties_" + std::to_string(i));
MG_ASSERT(status.ok());
/*
Clear all entries from the database.
*/
void Clear() {
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
db_->Delete(rocksdb::WriteOptions(), it->key().ToString());
}
}
/*
Read all vertices stored in the database.
*/
std::vector<storage::Vertex> Vertices() {
std::vector<storage::Vertex> vertices;
std::vector<std::unique_ptr<query::VertexAccessor>> Vertices(memgraph::query::DbAccessor &dba) {
std::vector<std::unique_ptr<query::VertexAccessor>> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
uint64_t key = std::stoull(it->key().ToString());
std::string key = it->key().ToString();
std::string value = it->value().ToString();
spdlog::debug("Key: {} Value: {}", key, value);
vertices.emplace_back(Gid::FromUint(key), nullptr);
auto res = DeserializeVertex(key, dba);
vertices.push_back(std::move(res));
}
return vertices;
}
/*
Read all vertices that have a specified label.
*/
std::vector<storage::Vertex> Vertices(const std::string_view label) {
std::vector<storage::Vertex> vertices;
rocksdb::Iterator *it = db_->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
uint64_t key = std::stoull(it->key().ToString());
std::string value = it->value().ToString();
spdlog::debug("Key: {} Value: {}", key, value);
vertices.emplace_back(Gid::FromUint(key), nullptr);
protected:
std::string SerializeLabels(auto &&labels) {
if (labels.HasError() || (*labels).empty()) {
return "";
}
return vertices;
std::string result = std::to_string((*labels)[0].AsUint());
std::string ser_labels = std::accumulate(
std::next((*labels).begin()), (*labels).end(), result,
[](const std::string &join, const auto &label_id) { return join + "," + std::to_string(label_id.AsUint()); });
return ser_labels;
}
/*
Serialize and store in-memory vertex to the disk.
*/
bool StoreVertex(query::VertexAccessor *vertex) {
rocksdb::Status status = db_->Put(rocksdb::WriteOptions(), std::to_string(vertex->Gid().AsUint()), "properties");
return status.ok();
std::string SerializeGid(const memgraph::storage::Gid &gid) { return std::to_string(gid.AsUint()); }
std::string SerializeVertex(const query::VertexAccessor &vertex_acc) {
// Serialize labels
std::string result = SerializeLabels(vertex_acc.Labels(storage::View::OLD));
result += "|";
// Serialize gid
result += SerializeGid(vertex_acc.Gid());
spdlog::info("Serialized vertex: {}", result);
return result;
}
std::unique_ptr<query::VertexAccessor> DeserializeVertex(const std::string_view key,
memgraph::query::DbAccessor &dba) {
// Create vertex
auto impl = std::make_unique<memgraph::query::VertexAccessor>(dba.InsertVertex());
spdlog::info("Key to deserialize: {}", key);
const auto vertex_parts = utils::Split(key, "|");
const storage::Gid gid = storage::Gid::FromUint(std::stoull(vertex_parts[1]));
impl->SetGid(gid);
if (!vertex_parts[0].empty()) {
const auto labels = utils::Split(vertex_parts[0], ",");
for (const auto &label : labels) {
const storage::LabelId label_id = storage::LabelId::FromUint(std::stoull(label));
auto maybe_error = impl->AddLabel(label_id);
if (maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case storage::Error::SERIALIZATION_ERROR:
throw utils::BasicException("Serialization");
case storage::Error::DELETED_OBJECT:
throw utils::BasicException("Trying to set a label on a deleted node.");
case storage::Error::VERTEX_HAS_EDGES:
case storage::Error::PROPERTIES_DISABLED:
case storage::Error::NONEXISTENT_OBJECT:
throw utils::BasicException("Unexpected error when setting a label.");
}
}
}
}
return impl;
}
private:

View File

@ -169,7 +169,8 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
labels = vertex_->labels;
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &labels](const Delta &delta) {
return std::move(labels);
/*ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &labels](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
// Remove the label because we don't see the addition.
@ -205,6 +206,7 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return std::move(labels);
*/
}
Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) {

View File

@ -13,6 +13,7 @@
#include <optional>
#include "storage/v2/id_types.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/config.hpp"
@ -101,6 +102,8 @@ class VertexAccessor final {
Gid Gid() const noexcept { return vertex_->gid; }
void SetGid(storage::Gid gid_) { vertex_->gid = storage::Gid::FromUint(gid_.AsUint()); }
bool operator==(const VertexAccessor &other) const noexcept {
return vertex_ == other.vertex_ && transaction_ == other.transaction_;
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -21,6 +21,7 @@
class ExpansionBenchFixture : public benchmark::Fixture {
protected:
std::optional<memgraph::storage::Storage> db;
memgraph::storage::rocks::RocksDBStorage disk_db_;
std::optional<memgraph::query::InterpreterContext> interpreter_context;
std::optional<memgraph::query::Interpreter> interpreter;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "expansion-benchmark"};
@ -47,7 +48,7 @@ class ExpansionBenchFixture : public benchmark::Fixture {
MG_ASSERT(!db->CreateIndex(label).HasError());
interpreter_context.emplace(&*db, memgraph::query::InterpreterConfig{}, data_directory);
interpreter_context.emplace(&*db, &disk_db_, memgraph::query::InterpreterConfig{}, data_directory);
interpreter.emplace(&*interpreter_context);
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -27,11 +27,13 @@ int main(int argc, char *argv[]) {
}
memgraph::storage::Storage db;
memgraph::storage::rocks::RocksDBStorage disk_db;
auto data_directory = std::filesystem::temp_directory_path() / "single_query_test";
memgraph::utils::OnScopeExit([&data_directory] { std::filesystem::remove_all(data_directory); });
memgraph::license::global_license_checker.EnableTesting();
memgraph::query::InterpreterContext interpreter_context{&db, memgraph::query::InterpreterConfig{}, data_directory};
memgraph::query::InterpreterContext interpreter_context{&db, &disk_db, memgraph::query::InterpreterConfig{},
data_directory};
memgraph::query::Interpreter interpreter{&interpreter_context};
ResultStreamFaker stream(&db);

View File

@ -303,6 +303,9 @@ target_link_libraries(${test_prefix}storage_v2_decoder_encoder mg-storage-v2)
add_unit_test(storage_v2_durability.cpp)
target_link_libraries(${test_prefix}storage_v2_durability mg-storage-v2)
add_unit_test(storage_rocks.cpp)
target_link_libraries(${test_prefix}storage_rocks mg-storage-v2)
add_unit_test(storage_v2_edge.cpp)
target_link_libraries(${test_prefix}storage_v2_edge mg-storage-v2)

View File

@ -49,8 +49,9 @@ auto ToEdgeList(const memgraph::communication::bolt::Value &v) {
class InterpreterTest : public ::testing::Test {
public:
memgraph::storage::Storage db_;
memgraph::storage::rocks::RocksDBStorage disk_db;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_interpreter"};
memgraph::query::InterpreterContext interpreter_context{&db_, {}, data_directory};
memgraph::query::InterpreterContext interpreter_context{&db_, &disk_db, {}, data_directory};
InterpreterFaker default_interpreter{&interpreter_context};
@ -1066,7 +1067,7 @@ TEST_F(InterpreterTest, AllowLoadCsvConfig) {
"row"};
memgraph::query::InterpreterContext csv_interpreter_context{
&db_, {.query = {.allow_load_csv = allow_load_csv}}, directory_manager.Path()};
&db_, &disk_db, {.query = {.allow_load_csv = allow_load_csv}}, directory_manager.Path()};
InterpreterFaker interpreter_faker{&csv_interpreter_context};
for (const auto &query : queries) {
if (allow_load_csv) {

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -202,8 +202,9 @@ DatabaseState GetState(memgraph::storage::Storage *db) {
}
auto Execute(memgraph::storage::Storage *db, const std::string &query) {
memgraph::storage::rocks::RocksDBStorage disk_db;
auto data_directory = std::filesystem::temp_directory_path() / "MG_tests_unit_query_dump";
memgraph::query::InterpreterContext context(db, memgraph::query::InterpreterConfig{}, data_directory);
memgraph::query::InterpreterContext context(db, &disk_db, memgraph::query::InterpreterConfig{}, data_directory);
memgraph::query::Interpreter interpreter(&context);
ResultStreamFaker stream(db);
@ -757,8 +758,10 @@ TEST(DumpTest, ExecuteDumpDatabase) {
class StatefulInterpreter {
public:
explicit StatefulInterpreter(memgraph::storage::Storage *db)
: db_(db), context_(db_, memgraph::query::InterpreterConfig{}, data_directory_), interpreter_(&context_) {}
explicit StatefulInterpreter(memgraph::storage::Storage *db, memgraph::storage::rocks::RocksDBStorage *disk_db)
: db_(db),
context_(db_, disk_db, memgraph::query::InterpreterConfig{}, data_directory_),
interpreter_(&context_) {}
auto Execute(const std::string &query) {
ResultStreamFaker stream(db_);
@ -785,7 +788,8 @@ const std::filesystem::path StatefulInterpreter::data_directory_{std::filesystem
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(DumpTest, ExecuteDumpDatabaseInMulticommandTransaction) {
memgraph::storage::Storage db;
StatefulInterpreter interpreter(&db);
memgraph::storage::rocks::RocksDBStorage disk_db;
StatefulInterpreter interpreter(&db, &disk_db);
// Begin the transaction before the vertex is created.
interpreter.Execute("BEGIN");

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -28,6 +28,7 @@ DECLARE_bool(query_cost_planner);
class QueryExecution : public testing::Test {
protected:
std::optional<memgraph::storage::Storage> db_;
std::optional<memgraph::storage::rocks::RocksDBStorage> disk_db_;
std::optional<memgraph::query::InterpreterContext> interpreter_context_;
std::optional<memgraph::query::Interpreter> interpreter_;
@ -35,7 +36,8 @@ class QueryExecution : public testing::Test {
void SetUp() {
db_.emplace();
interpreter_context_.emplace(&*db_, memgraph::query::InterpreterConfig{}, data_directory);
disk_db_.emplace();
interpreter_context_.emplace(&*db_, &*disk_db_, memgraph::query::InterpreterConfig{}, data_directory);
interpreter_.emplace(&*interpreter_context_);
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -22,6 +22,7 @@
#include "query/config.hpp"
#include "query/interpreter.hpp"
#include "query/stream/streams.hpp"
#include "storage/rocks/storage.hpp"
#include "storage/v2/storage.hpp"
#include "test_utils.hpp"
@ -55,6 +56,7 @@ class StreamsTest : public ::testing::Test {
protected:
memgraph::storage::Storage db_;
memgraph::storage::rocks::RocksDBStorage disk_db_;
std::filesystem::path data_directory_{GetCleanDataDirectory()};
KafkaClusterMock mock_cluster_{std::vector<std::string>{kTopicName}};
// Though there is a Streams object in interpreter context, it makes more sense to use a separate object to test,
@ -62,7 +64,8 @@ class StreamsTest : public ::testing::Test {
// Streams constructor.
// InterpreterContext::auth_checker_ is used in the Streams object, but only in the message processing part. Because
// these tests don't send any messages, the auth_checker_ pointer can be left as nullptr.
memgraph::query::InterpreterContext interpreter_context_{&db_, memgraph::query::InterpreterConfig{}, data_directory_};
memgraph::query::InterpreterContext interpreter_context_{&db_, &disk_db_, memgraph::query::InterpreterConfig{},
data_directory_};
std::filesystem::path streams_data_directory_{data_directory_ / "separate-dir-for-test"};
std::optional<Streams> streams_;

View File

@ -0,0 +1,117 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <unordered_set>
#include "query/db_accessor.hpp"
#include "storage/rocks/storage.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage/v2/view.hpp"
class RocksDBStorageTest : public ::testing::TestWithParam<bool> {};
TEST(RocksDBStorageTest, SerializeVertexGID) {
// TODO: expose transaction of the original storage
memgraph::storage::Storage storage;
memgraph::storage::rocks::RocksDBStorage db;
auto storage_dba = storage.Access(memgraph::storage::IsolationLevel::READ_UNCOMMITTED);
memgraph::query::DbAccessor dba(&storage_dba);
std::unordered_set<uint64_t> gids;
for (uint64_t i = 0; i < 5; ++i) {
gids.insert(i);
auto impl = dba.InsertVertex();
impl.SetGid(memgraph::storage::Gid::FromUint(i));
db.StoreVertex(impl);
}
// load vertices from disk
auto loaded_vertices = db.Vertices(dba);
db.Clear();
ASSERT_EQ(loaded_vertices.size(), 5);
for (const auto &vertex_acc : loaded_vertices) {
ASSERT_TRUE(gids.contains(vertex_acc->Gid().AsUint()));
}
}
TEST(RocksDBStorageTest, SerializeVertexGIDLabels) {
memgraph::storage::Storage storage;
memgraph::storage::rocks::RocksDBStorage db;
auto storage_dba = storage.Access(memgraph::storage::IsolationLevel::READ_UNCOMMITTED);
memgraph::query::DbAccessor dba(&storage_dba);
// save vertices on disk
std::vector<memgraph::storage::Vertex> vertices;
std::vector<std::string> labels{"Player", "Person", "Ball"};
std::unordered_set<uint64_t> gids;
std::vector<memgraph::storage::LabelId> label_ids;
for (int i = 0; i < 3; i++) {
label_ids.push_back(dba.NameToLabel(labels[i]));
}
for (int i = 0; i < 5; ++i) {
gids.insert(i);
auto impl = dba.InsertVertex();
impl.SetGid(memgraph::storage::Gid::FromUint(i));
impl.AddLabel(dba.NameToLabel(labels[i % 3]));
db.StoreVertex(impl);
}
// load vertices from disk
auto loaded_vertices = db.Vertices(dba);
db.Clear();
ASSERT_EQ(loaded_vertices.size(), 5);
for (const auto &vertex_acc : loaded_vertices) {
ASSERT_TRUE(gids.contains(vertex_acc->Gid().AsUint()));
auto labels = vertex_acc->Labels(memgraph::storage::View::OLD);
ASSERT_EQ(labels->size(), 1);
ASSERT_TRUE(std::all_of(labels->begin(), labels->end(), [&label_ids](const auto &label_id) {
return std::find(label_ids.begin(), label_ids.end(), label_id) != label_ids.end();
}));
}
}
TEST(RocksDBStorageTest, SerializeVertexGIDMutlipleLabels) {
memgraph::storage::Storage storage;
memgraph::storage::rocks::RocksDBStorage db;
auto storage_dba = storage.Access(memgraph::storage::IsolationLevel::READ_UNCOMMITTED);
memgraph::query::DbAccessor dba(&storage_dba);
// save vertices on disk
std::vector<memgraph::storage::Vertex> vertices;
std::vector<std::string> labels{"Player", "Person", "Ball"};
std::unordered_set<uint64_t> gids;
std::vector<memgraph::storage::LabelId> label_ids;
for (int i = 0; i < 3; i++) {
label_ids.push_back(dba.NameToLabel(labels[i]));
}
for (int i = 0; i < 5; ++i) {
gids.insert(i);
auto impl = dba.InsertVertex();
impl.SetGid(memgraph::storage::Gid::FromUint(i));
impl.AddLabel(label_ids[i % 3]);
impl.AddLabel(label_ids[(i + 1) % 3]);
db.StoreVertex(impl);
}
// load vertices from disk
auto loaded_vertices = db.Vertices(dba);
db.Clear();
ASSERT_EQ(loaded_vertices.size(), 5);
for (const auto &vertex_acc : loaded_vertices) {
ASSERT_TRUE(gids.contains(vertex_acc->Gid().AsUint()));
auto labels = vertex_acc->Labels(memgraph::storage::View::OLD);
ASSERT_EQ(labels->size(), 2);
ASSERT_TRUE(std::all_of(labels->begin(), labels->end(), [&label_ids](const auto &label_id) {
return std::find(label_ids.begin(), label_ids.end(), label_id) != label_ids.end();
}));
}
}

View File

@ -26,8 +26,9 @@ corresponding interpreter/.
class TransactionQueueSimpleTest : public ::testing::Test {
protected:
memgraph::storage::Storage db_;
memgraph::storage::rocks::RocksDBStorage disk_db_;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_transaction_queue_intr"};
memgraph::query::InterpreterContext interpreter_context{&db_, {}, data_directory};
memgraph::query::InterpreterContext interpreter_context{&db_, &disk_db_, {}, data_directory};
InterpreterFaker running_interpreter{&interpreter_context}, main_interpreter{&interpreter_context};
};

View File

@ -31,9 +31,10 @@ corresponding interpreter.
class TransactionQueueMultipleTest : public ::testing::Test {
protected:
memgraph::storage::Storage db_;
memgraph::storage::rocks::RocksDBStorage disk_db_;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() /
"MG_tests_unit_transaction_queue_multiple_intr"};
memgraph::query::InterpreterContext interpreter_context{&db_, {}, data_directory};
memgraph::query::InterpreterContext interpreter_context{&db_, &disk_db_, {}, data_directory};
InterpreterFaker main_interpreter{&interpreter_context};
std::vector<InterpreterFaker *> running_interpreters;