From b5413c6f823e89aaaaa9958e88c47bfff0709e21 Mon Sep 17 00:00:00 2001 From: Andi Date: Tue, 5 Sep 2023 19:00:53 +0200 Subject: [PATCH] Add edge import mode into the on-disk storage (#1157) --- src/query/exceptions.hpp | 21 +- src/query/frontend/ast/ast.cpp | 4 + src/query/frontend/ast/ast.hpp | 23 + src/query/frontend/ast/ast_visitor.hpp | 3 +- .../frontend/ast/cypher_main_visitor.cpp | 11 + .../frontend/ast/cypher_main_visitor.hpp | 5 + .../opencypher/grammar/MemgraphCypher.g4 | 9 +- .../opencypher/grammar/MemgraphCypherLexer.g4 | 6 +- .../frontend/semantic/required_privileges.cpp | 2 + .../frontend/stripped_lexer_constants.hpp | 2 +- src/query/interpreter.cpp | 42 + src/storage/v2/CMakeLists.txt | 1 + .../v2/disk/edge_import_mode_cache.cpp | 84 ++ .../v2/disk/edge_import_mode_cache.hpp | 74 ++ src/storage/v2/disk/label_index.cpp | 5 +- src/storage/v2/disk/label_index.hpp | 6 +- src/storage/v2/disk/label_property_index.cpp | 6 +- src/storage/v2/disk/label_property_index.hpp | 6 +- src/storage/v2/disk/rocksdb_storage.cpp | 14 +- src/storage/v2/disk/rocksdb_storage.hpp | 7 +- src/storage/v2/disk/storage.cpp | 773 ++++++++++++------ src/storage/v2/disk/storage.hpp | 127 +-- src/storage/v2/edge_accessor.cpp | 5 + src/storage/v2/edge_accessor.hpp | 3 +- src/storage/v2/edge_import_mode.hpp | 28 + src/storage/v2/indices/indices.cpp | 12 +- src/storage/v2/indices/indices.hpp | 2 +- src/storage/v2/indices/label_index.hpp | 4 +- .../v2/indices/label_property_index.hpp | 4 +- src/storage/v2/inmemory/label_index.cpp | 9 +- src/storage/v2/inmemory/label_index.hpp | 5 +- .../v2/inmemory/label_property_index.cpp | 13 +- .../v2/inmemory/label_property_index.hpp | 6 +- .../replication/replication_server.cpp | 5 +- src/storage/v2/inmemory/storage.cpp | 15 +- src/storage/v2/modified_edge.hpp | 39 + src/storage/v2/mvcc.hpp | 26 +- src/storage/v2/storage.cpp | 6 +- src/storage/v2/transaction.hpp | 21 +- src/storage/v2/vertex_accessor.cpp | 22 + src/utils/disk_utils.hpp | 6 + src/utils/rocksdb_serialization.hpp | 20 +- src/utils/typeinfo.hpp | 1 + tests/e2e/CMakeLists.txt | 1 + tests/e2e/import_mode/CMakeLists.txt | 6 + tests/e2e/import_mode/common.py | 25 + tests/e2e/import_mode/test_command.py | 208 +++++ tests/e2e/import_mode/workloads.yaml | 13 + tests/unit/storage_v2_wal_file.cpp | 2 +- 49 files changed, 1339 insertions(+), 399 deletions(-) create mode 100644 src/storage/v2/disk/edge_import_mode_cache.cpp create mode 100644 src/storage/v2/disk/edge_import_mode_cache.hpp create mode 100644 src/storage/v2/edge_import_mode.hpp create mode 100644 src/storage/v2/modified_edge.hpp create mode 100644 tests/e2e/import_mode/CMakeLists.txt create mode 100644 tests/e2e/import_mode/common.py create mode 100644 tests/e2e/import_mode/test_command.py create mode 100644 tests/e2e/import_mode/workloads.yaml diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 5c249f7cb..fdb374398 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -158,9 +158,12 @@ class ExplicitTransactionUsageException : public QueryRuntimeException { using QueryRuntimeException::QueryRuntimeException; }; -/** - * An exception for serialization error - */ +class WriteVertexOperationInEdgeImportModeException : public QueryException { + public: + WriteVertexOperationInEdgeImportModeException() + : QueryException("Write operations on vertices are forbidden while the edge import mode is active.") {} +}; + class TransactionSerializationException : public QueryException { public: using QueryException::QueryException; @@ -271,6 +274,12 @@ class StorageModeModificationInMulticommandTxException : public QueryException { : QueryException("Storage mode cannot be modified in multicommand transactions.") {} }; +class EdgeImportModeModificationInMulticommandTxException : public QueryException { + public: + EdgeImportModeModificationInMulticommandTxException() + : QueryException("Edge import mode cannot be modified in multicommand transactions.") {} +}; + class CreateSnapshotInMulticommandTxException final : public QueryException { public: CreateSnapshotInMulticommandTxException() @@ -282,6 +291,12 @@ class CreateSnapshotDisabledOnDiskStorage final : public QueryException { CreateSnapshotDisabledOnDiskStorage() : QueryException("In the on-disk storage mode data is already persistent.") {} }; +class EdgeImportModeQueryDisabledOnDiskStorage final : public QueryException { + public: + EdgeImportModeQueryDisabledOnDiskStorage() + : QueryException("Edge import mode is only allowed for on-disk storage mode.") {} +}; + class SettingConfigInMulticommandTxException final : public QueryException { public: SettingConfigInMulticommandTxException() diff --git a/src/query/frontend/ast/ast.cpp b/src/query/frontend/ast/ast.cpp index ca53f0752..09e54fdcf 100644 --- a/src/query/frontend/ast/ast.cpp +++ b/src/query/frontend/ast/ast.cpp @@ -285,4 +285,8 @@ constexpr utils::TypeInfo query::MultiDatabaseQuery::kType{utils::TypeId::AST_MU constexpr utils::TypeInfo query::ShowDatabasesQuery::kType{utils::TypeId::AST_SHOW_DATABASES, "ShowDatabasesQuery", &query::Query::kType}; + +constexpr utils::TypeInfo query::EdgeImportModeQuery::kType{utils::TypeId::AST_EDGE_IMPORT_MODE_QUERY, + "EdgeImportModeQuery", &query::Query::kType}; + } // namespace memgraph diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index e2ce6061c..1f95ad948 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3003,6 +3003,29 @@ class ReplicationQuery : public memgraph::query::Query { friend class AstStorage; }; +class EdgeImportModeQuery : public memgraph::query::Query { + public: + static const utils::TypeInfo kType; + const utils::TypeInfo &GetTypeInfo() const override { return kType; } + + enum class Status { ACTIVE, INACTIVE }; + + EdgeImportModeQuery() = default; + + DEFVISITABLE(QueryVisitor); + + memgraph::query::EdgeImportModeQuery::Status status_; + + EdgeImportModeQuery *Clone(AstStorage *storage) const override { + auto *object = storage->Create(); + object->status_ = status_; + return object; + } + + private: + friend class AstStorage; +}; + class LockPathQuery : public memgraph::query::Query { public: static const utils::TypeInfo kType; diff --git a/src/query/frontend/ast/ast_visitor.hpp b/src/query/frontend/ast/ast_visitor.hpp index 9bb2cddc6..89b405663 100644 --- a/src/query/frontend/ast/ast_visitor.hpp +++ b/src/query/frontend/ast/ast_visitor.hpp @@ -105,6 +105,7 @@ class TransactionQueueQuery; class Exists; class MultiDatabaseQuery; class ShowDatabasesQuery; +class EdgeImportModeQuery; using TreeCompositeVisitor = utils::CompositeVisitor< SingleQuery, CypherUnion, NamedExpression, OrOperator, XorOperator, AndOperator, NotOperator, AdditionOperator, @@ -143,6 +144,6 @@ class QueryVisitor ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, FreeMemoryQuery, TriggerQuery, IsolationLevelQuery, CreateSnapshotQuery, StreamQuery, SettingQuery, VersionQuery, ShowConfigQuery, TransactionQueueQuery, StorageModeQuery, AnalyzeGraphQuery, - MultiDatabaseQuery, ShowDatabasesQuery> {}; + MultiDatabaseQuery, ShowDatabasesQuery, EdgeImportModeQuery> {}; } // namespace memgraph::query diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 0b1b875ce..c3f401bda 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -269,6 +269,17 @@ antlrcpp::Any CypherMainVisitor::visitReplicationQuery(MemgraphCypher::Replicati return replication_query; } +antlrcpp::Any CypherMainVisitor::visitEdgeImportModeQuery(MemgraphCypher::EdgeImportModeQueryContext *ctx) { + auto *edge_import_mode_query = storage_->Create(); + if (ctx->ACTIVE()) { + edge_import_mode_query->status_ = EdgeImportModeQuery::Status::ACTIVE; + } else { + edge_import_mode_query->status_ = EdgeImportModeQuery::Status::INACTIVE; + } + query_ = edge_import_mode_query; + return edge_import_mode_query; +} + antlrcpp::Any CypherMainVisitor::visitSetReplicationRole(MemgraphCypher::SetReplicationRoleContext *ctx) { auto *replication_query = storage_->Create(); replication_query->action_ = ReplicationQuery::Action::SET_REPLICATION_ROLE; diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index 181fa773e..3bcafcbaf 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -196,6 +196,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitReplicationQuery(MemgraphCypher::ReplicationQueryContext *ctx) override; + /** + * @return EdgeImportMode* + */ + antlrcpp::Any visitEdgeImportModeQuery(MemgraphCypher::EdgeImportModeQueryContext *ctx) override; + /** * @return ReplicationQuery* */ diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index bd9ce6e95..deed2a3e6 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -20,6 +20,7 @@ options { tokenVocab=MemgraphCypherLexer; } import Cypher ; memgraphCypherKeyword : cypherKeyword + | ACTIVE | AFTER | ALTER | ANALYZE @@ -48,6 +49,7 @@ memgraphCypherKeyword : cypherKeyword | DENY | DROP | DUMP + | EDGE | EDGE_TYPES | EXECUTE | FOR @@ -60,9 +62,11 @@ memgraphCypherKeyword : cypherKeyword | HEADER | IDENTIFIED | NULLIF - | ISOLATION + | IMPORT + | INACTIVE | IN_MEMORY_ANALYTICAL | IN_MEMORY_TRANSACTIONAL + | ISOLATION | KAFKA | LABELS | LEVEL @@ -143,6 +147,7 @@ query : cypherQuery | transactionQueueQuery | multiDatabaseQuery | showDatabases + | edgeImportModeQuery ; authQuery : createRole @@ -475,3 +480,5 @@ useDatabase : USE DATABASE databaseName ; dropDatabase : DROP DATABASE databaseName ; showDatabases: SHOW DATABASES ; + +edgeImportModeQuery : EDGE IMPORT MODE ( ACTIVE | INACTIVE ) ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index 37b0015ce..55518cc19 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -23,6 +23,7 @@ lexer grammar MemgraphCypherLexer ; import CypherLexer ; +ACTIVE : A C T I V E ; AFTER : A F T E R ; ALTER : A L T E R ; ANALYZE : A N A L Y Z E ; @@ -55,6 +56,7 @@ DIRECTORY : D I R E C T O R Y ; DROP : D R O P ; DUMP : D U M P ; DURABILITY : D U R A B I L I T Y ; +EDGE : E D G E ; EDGE_TYPES : E D G E UNDERSCORE T Y P E S ; EXECUTE : E X E C U T E ; FOR : F O R ; @@ -69,9 +71,11 @@ GRANTS : G R A N T S ; HEADER : H E A D E R ; IDENTIFIED : I D E N T I F I E D ; IGNORE : I G N O R E ; -ISOLATION : I S O L A T I O N ; +IMPORT : I M P O R T ; +INACTIVE : I N A C T I V E ; IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ; IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ; +ISOLATION : I S O L A T I O N ; KAFKA : K A F K A ; LABELS : L A B E L S ; LEVEL : L E V E L ; diff --git a/src/query/frontend/semantic/required_privileges.cpp b/src/query/frontend/semantic/required_privileges.cpp index d7d432410..93447498b 100644 --- a/src/query/frontend/semantic/required_privileges.cpp +++ b/src/query/frontend/semantic/required_privileges.cpp @@ -87,6 +87,8 @@ class PrivilegeExtractor : public QueryVisitor, public HierarchicalTreeVis void Visit(TransactionQueueQuery & /*transaction_queue_query*/) override {} + void Visit(EdgeImportModeQuery & /*edge_import_mode_query*/) override {} + void Visit(VersionQuery & /*version_query*/) override { AddPrivilege(AuthQuery::Privilege::STATS); } void Visit(MultiDatabaseQuery &query) override { diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index 775081941..9ad95e6f5 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -218,7 +218,7 @@ const trie::Trie kKeywords = {"union", "data", "directory", "lock", - "unlock" + "unlock", "build"}; // Unicode codepoints that are allowed at the start of the unescaped name. diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 18f387c87..6612e3f57 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -64,6 +64,7 @@ #include "spdlog/spdlog.h" #include "storage/v2/disk/storage.hpp" #include "storage/v2/edge.hpp" +#include "storage/v2/edge_import_mode.hpp" #include "storage/v2/id_types.hpp" #include "storage/v2/inmemory/storage.hpp" #include "storage/v2/property_value.hpp" @@ -2459,6 +2460,13 @@ constexpr auto ToStorageMode(const StorageModeQuery::StorageMode storage_mode) n } } +constexpr auto ToEdgeImportMode(const EdgeImportModeQuery::Status status) noexcept { + if (status == EdgeImportModeQuery::Status::ACTIVE) { + return storage::EdgeImportMode::ACTIVE; + } + return storage::EdgeImportMode::INACTIVE; +} + bool SwitchingFromInMemoryToDisk(storage::StorageMode current_mode, storage::StorageMode next_mode) { return (current_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL || current_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL) && @@ -2602,6 +2610,37 @@ PreparedQuery PrepareStorageModeQuery(ParsedQuery parsed_query, const bool in_ex RWType::NONE}; } +PreparedQuery PrepareEdgeImportModeQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, + InterpreterContext *interpreter_context) { + if (in_explicit_transaction) { + throw EdgeImportModeModificationInMulticommandTxException(); + } + + if (interpreter_context->db->GetStorageMode() != storage::StorageMode::ON_DISK_TRANSACTIONAL) { + throw EdgeImportModeQueryDisabledOnDiskStorage(); + } + + auto *edge_import_mode_query = utils::Downcast(parsed_query.query); + MG_ASSERT(edge_import_mode_query); + const auto requested_status = ToEdgeImportMode(edge_import_mode_query->status_); + + auto callback = [requested_status, interpreter_context]() -> std::function { + return [interpreter_context, requested_status] { + auto *disk_storage = static_cast(interpreter_context->db.get()); + disk_storage->SetEdgeImportMode(requested_status); + }; + }(); + + return PreparedQuery{{}, + std::move(parsed_query.required_privileges), + [callback = std::move(callback)](AnyStream * /*stream*/, + std::optional /*n*/) -> std::optional { + callback(); + return QueryHandlerResult::COMMIT; + }, + RWType::NONE}; +} + PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_explicit_transaction, InterpreterContext *interpreter_context) { if (in_explicit_transaction) { @@ -3645,6 +3684,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareShowDatabasesQuery(std::move(parsed_query), interpreter_context_, session_uuid, username_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = + PrepareEdgeImportModeQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_); } else { LOG_FATAL("Should not get here -- unknown query type!"); } diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index d3bdc5abf..3cd3a04d7 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(mg-storage-v2 STATIC inmemory/label_index.cpp inmemory/label_property_index.cpp inmemory/unique_constraints.cpp + disk/edge_import_mode_cache.cpp disk/storage.cpp disk/rocksdb_storage.cpp disk/label_index.cpp diff --git a/src/storage/v2/disk/edge_import_mode_cache.cpp b/src/storage/v2/disk/edge_import_mode_cache.cpp new file mode 100644 index 000000000..1f623601e --- /dev/null +++ b/src/storage/v2/disk/edge_import_mode_cache.cpp @@ -0,0 +1,84 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v2/disk//edge_import_mode_cache.hpp" +#include +#include "storage/v2/disk/label_property_index.hpp" +#include "storage/v2/indices/indices.hpp" +#include "storage/v2/inmemory/label_index.hpp" +#include "storage/v2/mvcc.hpp" +#include "storage/v2/storage_mode.hpp" +#include "storage/v2/transaction.hpp" +#include "utils/algorithm.hpp" +#include "utils/disk_utils.hpp" + +namespace memgraph::storage { + +EdgeImportModeCache::EdgeImportModeCache(const Config &config) + : in_memory_indices_(Indices(config, StorageMode::IN_MEMORY_TRANSACTIONAL)) {} + +InMemoryLabelIndex::Iterable EdgeImportModeCache::Vertices(LabelId label, View view, Transaction *transaction, + Constraints *constraints) const { + auto *mem_label_index = static_cast(in_memory_indices_.label_index_.get()); + return mem_label_index->Vertices(label, view, transaction, constraints); +} + +InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices( + LabelId label, PropertyId property, const std::optional> &lower_bound, + const std::optional> &upper_bound, View view, Transaction *transaction, + Constraints *constraints) const { + auto *mem_label_property_index = + static_cast(in_memory_indices_.label_property_index_.get()); + return mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, transaction, constraints); +} + +bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property, + const std::optional ¶llel_exec_info) { + auto *mem_label_property_index = + static_cast(in_memory_indices_.label_property_index_.get()); + bool res = mem_label_property_index->CreateIndex(label, property, vertices_.access(), parallel_exec_info); + if (res) { + scanned_label_properties_.insert({label, property}); + } + return res; +} + +bool EdgeImportModeCache::CreateIndex(LabelId label, + const std::optional ¶llel_exec_info) { + auto *mem_label_index = static_cast(in_memory_indices_.label_index_.get()); + bool res = mem_label_index->CreateIndex(label, vertices_.access(), parallel_exec_info); + if (res) { + scanned_labels_.insert(label); + } + return res; +} + +bool EdgeImportModeCache::VerticesWithLabelPropertyScanned(LabelId label, PropertyId property) const { + return VerticesWithLabelScanned(label) || utils::Contains(scanned_label_properties_, std::make_pair(label, property)); +} + +bool EdgeImportModeCache::VerticesWithLabelScanned(LabelId label) const { + return AllVerticesScanned() || utils::Contains(scanned_labels_, label); +} + +bool EdgeImportModeCache::AllVerticesScanned() const { return scanned_all_vertices_; } + +utils::SkipList::Accessor EdgeImportModeCache::AccessToVertices() { return vertices_.access(); } + +utils::SkipList::Accessor EdgeImportModeCache::AccessToEdges() { return edges_.access(); } + +void EdgeImportModeCache::SetScannedAllVertices() { scanned_all_vertices_ = true; } + +utils::Synchronized, utils::SpinLock> &EdgeImportModeCache::GetCommittedTransactions() { + return committed_transactions_; +} + +} // namespace memgraph::storage diff --git a/src/storage/v2/disk/edge_import_mode_cache.hpp b/src/storage/v2/disk/edge_import_mode_cache.hpp new file mode 100644 index 000000000..eacfd7b64 --- /dev/null +++ b/src/storage/v2/disk/edge_import_mode_cache.hpp @@ -0,0 +1,74 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "storage/v2/delta.hpp" +#include "storage/v2/disk/label_index.hpp" +#include "storage/v2/disk/label_property_index.hpp" +#include "storage/v2/id_types.hpp" +#include "storage/v2/indices/indices.hpp" +#include "storage/v2/inmemory/label_index.hpp" +#include "storage/v2/inmemory/label_property_index.hpp" +#include "storage/v2/transaction.hpp" +#include "storage/v2/vertex.hpp" +#include "utils/skip_list.hpp" + +namespace memgraph::storage { + +class EdgeImportModeCache final { + public: + explicit EdgeImportModeCache(const Config &config); + + EdgeImportModeCache(const EdgeImportModeCache &) = delete; + EdgeImportModeCache &operator=(const EdgeImportModeCache &) = delete; + EdgeImportModeCache(EdgeImportModeCache &&) = delete; + EdgeImportModeCache &operator=(EdgeImportModeCache &&) = delete; + ~EdgeImportModeCache() = default; + + InMemoryLabelIndex::Iterable Vertices(LabelId label, View view, Transaction *transaction, + Constraints *constraints) const; + + InMemoryLabelPropertyIndex::Iterable Vertices(LabelId label, PropertyId property, + const std::optional> &lower_bound, + const std::optional> &upper_bound, + View view, Transaction *transaction, Constraints *constraints) const; + + bool CreateIndex(LabelId label, PropertyId property, + const std::optional ¶llel_exec_info = {}); + + bool CreateIndex(LabelId label, const std::optional ¶llel_exec_info = {}); + + bool VerticesWithLabelPropertyScanned(LabelId label, PropertyId property) const; + + bool VerticesWithLabelScanned(LabelId label) const; + + bool AllVerticesScanned() const; + + utils::SkipList::Accessor AccessToVertices(); + + utils::SkipList::Accessor AccessToEdges(); + + void SetScannedAllVertices(); + + utils::Synchronized, utils::SpinLock> &GetCommittedTransactions(); + + private: + utils::SkipList vertices_; + utils::SkipList edges_; + Indices in_memory_indices_; + bool scanned_all_vertices_{false}; + std::set scanned_labels_; + std::set> scanned_label_properties_; + utils::Synchronized, utils::SpinLock> committed_transactions_; +}; + +} // namespace memgraph::storage diff --git a/src/storage/v2/disk/label_index.cpp b/src/storage/v2/disk/label_index.cpp index 810e97ba8..a8036ec33 100644 --- a/src/storage/v2/disk/label_index.cpp +++ b/src/storage/v2/disk/label_index.cpp @@ -45,8 +45,7 @@ bool CommitWithTimestamp(rocksdb::Transaction *disk_transaction, uint64_t commit } // namespace -DiskLabelIndex::DiskLabelIndex(Indices *indices, Constraints *constraints, const Config &config) - : LabelIndex(indices, constraints, config) { +DiskLabelIndex::DiskLabelIndex(Indices *indices, const Config &config) : LabelIndex(indices, config) { utils::EnsureDirOrDie(config.disk.label_index_directory); kvstore_ = std::make_unique(); kvstore_->options_.create_if_missing = true; @@ -216,4 +215,6 @@ void DiskLabelIndex::LoadIndexInfo(const std::vector &labels) { RocksDBStorage *DiskLabelIndex::GetRocksDBStorage() const { return kvstore_.get(); } +std::unordered_set DiskLabelIndex::GetInfo() const { return index_; } + } // namespace memgraph::storage diff --git a/src/storage/v2/disk/label_index.hpp b/src/storage/v2/disk/label_index.hpp index 76151ef22..d3c3dce34 100644 --- a/src/storage/v2/disk/label_index.hpp +++ b/src/storage/v2/disk/label_index.hpp @@ -9,6 +9,8 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#pragma once + #include #include @@ -21,7 +23,7 @@ namespace memgraph::storage { class DiskLabelIndex : public storage::LabelIndex { public: - DiskLabelIndex(Indices *indices, Constraints *constraints, const Config &config); + DiskLabelIndex(Indices *indices, const Config &config); [[nodiscard]] bool CreateIndex(LabelId label, const std::vector> &vertices); @@ -53,6 +55,8 @@ class DiskLabelIndex : public storage::LabelIndex { void LoadIndexInfo(const std::vector &labels); + std::unordered_set GetInfo() const; + private: utils::Synchronized>>> entries_for_deletion; std::unordered_set index_; diff --git a/src/storage/v2/disk/label_property_index.cpp b/src/storage/v2/disk/label_property_index.cpp index 6c9d25fe3..5c7f0f05d 100644 --- a/src/storage/v2/disk/label_property_index.cpp +++ b/src/storage/v2/disk/label_property_index.cpp @@ -49,8 +49,8 @@ bool CommitWithTimestamp(rocksdb::Transaction *disk_transaction, uint64_t commit } // namespace -DiskLabelPropertyIndex::DiskLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config) - : LabelPropertyIndex(indices, constraints, config) { +DiskLabelPropertyIndex::DiskLabelPropertyIndex(Indices *indices, const Config &config) + : LabelPropertyIndex(indices, config) { utils::EnsureDirOrDie(config.disk.label_property_index_directory); kvstore_ = std::make_unique(); kvstore_->options_.create_if_missing = true; @@ -223,4 +223,6 @@ void DiskLabelPropertyIndex::LoadIndexInfo(const std::vector &keys) RocksDBStorage *DiskLabelPropertyIndex::GetRocksDBStorage() const { return kvstore_.get(); } +std::set> DiskLabelPropertyIndex::GetInfo() const { return index_; } + } // namespace memgraph::storage diff --git a/src/storage/v2/disk/label_property_index.hpp b/src/storage/v2/disk/label_property_index.hpp index ba7f7aaa8..59379a6ee 100644 --- a/src/storage/v2/disk/label_property_index.hpp +++ b/src/storage/v2/disk/label_property_index.hpp @@ -9,6 +9,8 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#pragma once + #include "storage/v2/disk/rocksdb_storage.hpp" #include "storage/v2/indices/label_property_index.hpp" @@ -20,7 +22,7 @@ using ParallelizedIndexCreationInfo = class DiskLabelPropertyIndex : public storage::LabelPropertyIndex { public: - DiskLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config); + DiskLabelPropertyIndex(Indices *indices, const Config &config); bool CreateIndex(LabelId label, PropertyId property, const std::vector> &vertices); @@ -61,6 +63,8 @@ class DiskLabelPropertyIndex : public storage::LabelPropertyIndex { void LoadIndexInfo(const std::vector &keys); + std::set> GetInfo() const; + private: utils::Synchronized>>>> entries_for_deletion; diff --git a/src/storage/v2/disk/rocksdb_storage.cpp b/src/storage/v2/disk/rocksdb_storage.cpp index b2803b464..fefbe36a9 100644 --- a/src/storage/v2/disk/rocksdb_storage.cpp +++ b/src/storage/v2/disk/rocksdb_storage.cpp @@ -10,6 +10,7 @@ // licenses/APL.txt. #include "rocksdb_storage.hpp" + #include #include "utils/rocksdb_serialization.hpp" @@ -80,15 +81,6 @@ int ComparatorWithU64TsImpl::CompareTimestamp(const rocksdb::Slice &ts1, const r return 0; } -DiskEdgeKey::DiskEdgeKey(storage::EdgeAccessor *edge_acc) { - auto from_gid = utils::SerializeIdType(edge_acc->FromVertex().Gid()); - auto to_gid = utils::SerializeIdType(edge_acc->ToVertex().Gid()); - auto edge_type = utils::SerializeIdType(edge_acc->EdgeType()); - auto edge_gid = utils::SerializeIdType(edge_acc->Gid()); - - key = fmt::format("{}|{}|{}|{}|{}", from_gid, to_gid, utils::outEdgeDirection, edge_type, edge_gid); -} - DiskEdgeKey::DiskEdgeKey(storage::Gid src_vertex_gid, storage::Gid dest_vertex_gid, storage::EdgeTypeId edge_type_id, const storage::EdgeRef edge_ref, bool properties_on_edges) { auto from_gid = utils::SerializeIdType(src_vertex_gid); @@ -105,6 +97,10 @@ DiskEdgeKey::DiskEdgeKey(storage::Gid src_vertex_gid, storage::Gid dest_vertex_g key = fmt::format("{}|{}|{}|{}|{}", from_gid, to_gid, utils::outEdgeDirection, edge_type, edge_gid); } +DiskEdgeKey::DiskEdgeKey(const ModifiedEdgeInfo &edge_info, bool properties_on_edges) + : DiskEdgeKey(edge_info.src_vertex_gid, edge_info.dest_vertex_gid, edge_info.edge_type_id, edge_info.edge_ref, + properties_on_edges) {} + std::string DiskEdgeKey::GetVertexOutGid() const { return key.substr(0, key.find('|')); } std::string DiskEdgeKey::GetVertexInGid() const { diff --git a/src/storage/v2/disk/rocksdb_storage.hpp b/src/storage/v2/disk/rocksdb_storage.hpp index f3e05a44c..b33f49feb 100644 --- a/src/storage/v2/disk/rocksdb_storage.hpp +++ b/src/storage/v2/disk/rocksdb_storage.hpp @@ -18,9 +18,10 @@ #include #include -#include "storage/v2/edge_accessor.hpp" #include "storage/v2/edge_direction.hpp" +#include "storage/v2/edge_ref.hpp" #include "storage/v2/id_types.hpp" +#include "storage/v2/modified_edge.hpp" #include "storage/v2/property_store.hpp" #include "utils/logging.hpp" @@ -79,14 +80,14 @@ class ComparatorWithU64TsImpl : public rocksdb::Comparator { struct DiskEdgeKey { DiskEdgeKey(const std::string_view keyView) : key(keyView) {} - DiskEdgeKey(EdgeAccessor *edge_acc); - /// @tparam src_vertex_gid, dest_vertex_gid: Gid of the source and destination vertices /// @tparam edge_type_id: EdgeTypeId of the edge /// @tparam edge_ref: Edge to be serialized DiskEdgeKey(Gid src_vertex_gid, storage::Gid dest_vertex_gid, storage::EdgeTypeId edge_type_id, const EdgeRef edge_ref, bool properties_on_edges); + DiskEdgeKey(const ModifiedEdgeInfo &edge_info, bool properties_on_edges); + std::string GetSerializedKey() const { return key; } std::string GetVertexOutGid() const; diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index 5da3eb2ba..8f2c160fd 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -10,6 +10,7 @@ // licenses/APL.txt. #include +#include #include #include #include @@ -28,13 +29,17 @@ #include "kvstore/kvstore.hpp" #include "spdlog/spdlog.h" #include "storage/v2/constraints/unique_constraints.hpp" +#include "storage/v2/delta.hpp" +#include "storage/v2/disk/edge_import_mode_cache.hpp" #include "storage/v2/disk/label_index.hpp" #include "storage/v2/disk/label_property_index.hpp" #include "storage/v2/disk/rocksdb_storage.hpp" #include "storage/v2/disk/storage.hpp" #include "storage/v2/disk/unique_constraints.hpp" -#include "storage/v2/edge.hpp" +#include "storage/v2/edge_import_mode.hpp" +#include "storage/v2/edge_ref.hpp" #include "storage/v2/id_types.hpp" +#include "storage/v2/modified_edge.hpp" #include "storage/v2/mvcc.hpp" #include "storage/v2/property_store.hpp" #include "storage/v2/property_value.hpp" @@ -43,6 +48,7 @@ #include "storage/v2/storage_error.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "storage/v2/vertices_iterable.hpp" #include "storage/v2/view.hpp" #include "utils/disk_utils.hpp" #include "utils/exceptions.hpp" @@ -76,6 +82,8 @@ constexpr const char *label_property_index_str = "label_property_index"; constexpr const char *existence_constraints_str = "existence_constraints"; constexpr const char *unique_constraints_str = "unique_constraints"; +/// TODO: (andi) Maybe a better way of checking would be if the first delta is DELETE_DESERIALIZED +/// then we now that the vertex has only been deserialized and nothing more has been done on it. bool VertexNeedsToBeSerialized(const Vertex &vertex) { Delta *head = vertex.delta; while (head != nullptr) { @@ -89,10 +97,6 @@ bool VertexNeedsToBeSerialized(const Vertex &vertex) { return false; } -bool VertexExistsInCache(const utils::SkipList::Accessor &accessor, Gid gid) { - return accessor.find(gid) != accessor.end(); -} - bool VertexHasLabel(const Vertex &vertex, LabelId label, Transaction *transaction, View view) { bool deleted = vertex.deleted; bool has_label = std::find(vertex.labels.begin(), vertex.labels.end(), label) != vertex.labels.end(); @@ -334,45 +338,42 @@ DiskStorage::DiskAccessor::~DiskAccessor() { } /// NOTE: This will create Delta object which will cause deletion of old key entry on the disk -std::optional DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(const std::string &key, - const std::string &value, - const std::string &ts) { +std::optional DiskStorage::DiskAccessor::LoadVertexToMainMemoryCache(std::string &&key, + std::string &&value, + std::string &&ts) { auto main_storage_accessor = vertices_.access(); storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key))); - if (VertexExistsInCache(main_storage_accessor, gid)) { + if (ObjectExistsInCache(main_storage_accessor, gid)) { return std::nullopt; } std::vector labels_id{utils::DeserializeLabelsFromMainDiskStorage(key)}; PropertyStore properties{utils::DeserializePropertiesFromMainDiskStorage(value)}; return CreateVertexFromDisk(main_storage_accessor, gid, std::move(labels_id), std::move(properties), - CreateDeleteDeserializedObjectDelta(&transaction_, key, ts)); + CreateDeleteDeserializedObjectDelta(&transaction_, std::move(key), std::move(ts))); } std::optional DiskStorage::DiskAccessor::LoadVertexToLabelIndexCache( - LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta, + std::string &&key, std::string &&value, Delta *index_delta, utils::SkipList::Accessor index_accessor) { storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key))); - if (VertexExistsInCache(index_accessor, gid)) { + if (ObjectExistsInCache(index_accessor, gid)) { return std::nullopt; } - /// TODO: (andi) I think this is now changed with one PR - std::vector labels_id{utils::DeserializeLabelsFromLabelIndexStorage(value)}; - labels_id.push_back(indexing_label); + std::vector labels_id{utils::DeserializeLabelsFromLabelIndexStorage(key, value)}; PropertyStore properties{utils::DeserializePropertiesFromLabelIndexStorage(value)}; return CreateVertexFromDisk(index_accessor, gid, std::move(labels_id), std::move(properties), index_delta); } +/// TODO: can be decoupled by providing as arguments extractor functions and delta. std::optional DiskStorage::DiskAccessor::LoadVertexToLabelPropertyIndexCache( - LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta, + std::string &&key, std::string &&value, Delta *index_delta, utils::SkipList::Accessor index_accessor) { storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key))); - if (VertexExistsInCache(index_accessor, gid)) { + if (ObjectExistsInCache(index_accessor, gid)) { return std::nullopt; } - /// TODO: (andi) I think this is now changed with one PR - std::vector labels_id{utils::DeserializeLabelsFromLabelPropertyIndexStorage(value)}; - labels_id.push_back(indexing_label); + std::vector labels_id{utils::DeserializeLabelsFromLabelPropertyIndexStorage(key, value)}; PropertyStore properties{utils::DeserializePropertiesFromLabelPropertyIndexStorage(value)}; return CreateVertexFromDisk(index_accessor, gid, std::move(labels_id), std::move(properties), index_delta); } @@ -412,11 +413,7 @@ std::optional DiskStorage::DiskAccessor::DeserializeEdge(const roc return *maybe_edge; } -VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) { - if (scanned_all_vertices_) { - return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_, - &storage_->constraints_, storage_->config_.items)); - } +void DiskStorage::DiskAccessor::LoadVerticesToMainMemoryCache() { auto *disk_storage = static_cast(storage_); rocksdb::ReadOptions ro; std::string strTs = utils::StringTimestamp(transaction_.start_timestamp); @@ -425,16 +422,159 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) { auto it = std::unique_ptr(disk_transaction_->GetIterator(ro, disk_storage->kvstore_->vertex_chandle)); for (it->SeekToFirst(); it->Valid(); it->Next()) { - // We should pass it->timestamp().ToString() instead of deserializeTimestamp + // We should pass it->timestamp().ToString() instead of "0" // This is hack until RocksDB will support timestamp() in WBWI iterator LoadVertexToMainMemoryCache(it->key().ToString(), it->value().ToString(), deserializeTimestamp); } +} + +/// TODO: how to remove this +/// TODO: When loading from disk, you can in some situations load from index rocksdb not the main one +/// TODO: send from and to as arguments and remove so many methods +void DiskStorage::DiskAccessor::LoadVerticesFromMainStorageToEdgeImportCache() { + auto *disk_storage = static_cast(storage_); + auto cache_accessor = disk_storage->edge_import_mode_cache_->AccessToVertices(); + + rocksdb::ReadOptions ro; + std::string strTs = utils::StringTimestamp(transaction_.start_timestamp); + rocksdb::Slice ts(strTs); + ro.timestamp = &ts; + auto it = + std::unique_ptr(disk_transaction_->GetIterator(ro, disk_storage->kvstore_->vertex_chandle)); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromMainDiskStorage(key))); + if (ObjectExistsInCache(cache_accessor, gid)) continue; + + std::vector labels_id{utils::DeserializeLabelsFromMainDiskStorage(key)}; + PropertyStore properties{utils::DeserializePropertiesFromMainDiskStorage(value)}; + CreateVertexFromDisk(cache_accessor, gid, std::move(labels_id), std::move(properties), + CreateDeleteDeserializedObjectDelta(&transaction_, std::move(key), deserializeTimestamp)); + } +} + +void DiskStorage::DiskAccessor::HandleMainLoadingForEdgeImportCache() { + auto *disk_storage = static_cast(storage_); + if (!disk_storage->edge_import_mode_cache_->AllVerticesScanned()) { + LoadVerticesFromMainStorageToEdgeImportCache(); + disk_storage->edge_import_mode_cache_->SetScannedAllVertices(); + } +} + +void DiskStorage::DiskAccessor::LoadVerticesFromLabelIndexStorageToEdgeImportCache(LabelId label) { + auto *disk_storage = static_cast(storage_); + auto *disk_label_index = static_cast(disk_storage->indices_.label_index_.get()); + auto disk_index_transaction = disk_label_index->CreateRocksDBTransaction(); + auto cache_accessor = disk_storage->edge_import_mode_cache_->AccessToVertices(); + + rocksdb::ReadOptions ro; + std::string strTs = utils::StringTimestamp(transaction_.start_timestamp); + rocksdb::Slice ts(strTs); + ro.timestamp = &ts; + auto it = std::unique_ptr(disk_index_transaction->GetIterator(ro)); + std::string label_prefix{utils::SerializeIdType(label)}; + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + if (key.starts_with(label_prefix)) { + storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key))); + if (ObjectExistsInCache(cache_accessor, gid)) continue; + + std::vector labels_id{utils::DeserializeLabelsFromLabelIndexStorage(key, value)}; + PropertyStore properties{utils::DeserializePropertiesFromLabelIndexStorage(value)}; + CreateVertexFromDisk(cache_accessor, gid, std::move(labels_id), std::move(properties), + CreateDeleteDeserializedObjectDelta(&transaction_, std::move(key), deserializeTimestamp)); + } + } +} + +void DiskStorage::DiskAccessor::HandleLoadingLabelForEdgeImportCache(LabelId label) { + auto *disk_storage = static_cast(storage_); + if (!disk_storage->edge_import_mode_cache_->VerticesWithLabelScanned(label)) { + LoadVerticesFromLabelIndexStorageToEdgeImportCache(label); + + if (!disk_storage->edge_import_mode_cache_->CreateIndex(label)) { + throw utils::BasicException("Failed creation of in-memory label index."); + } + } +} + +void DiskStorage::DiskAccessor::HandleLoadingLabelPropertyForEdgeImportCache(LabelId label, PropertyId property) { + auto *disk_storage = static_cast(storage_); + if (!disk_storage->edge_import_mode_cache_->VerticesWithLabelPropertyScanned(label, property)) { + LoadVerticesFromLabelPropertyIndexStorageToEdgeImportCache(label, property); + + if (!disk_storage->edge_import_mode_cache_->CreateIndex(label, property)) { + throw utils::BasicException("Failed creation of in-memory label-property index."); + } + } +} + +/// TODO: Just extract disk_label_index and disk_label_property_index +/// TODO: put it into a EdgeImportModeCache methods +void DiskStorage::DiskAccessor::LoadVerticesFromLabelPropertyIndexStorageToEdgeImportCache(LabelId label, + PropertyId property) { + auto *disk_storage = static_cast(storage_); + auto *disk_label_property_index = + static_cast(disk_storage->indices_.label_property_index_.get()); + auto disk_index_transaction = disk_label_property_index->CreateRocksDBTransaction(); + auto cache_accessor = disk_storage->edge_import_mode_cache_->AccessToVertices(); + + rocksdb::ReadOptions ro; + std::string strTs = utils::StringTimestamp(transaction_.start_timestamp); + rocksdb::Slice ts(strTs); + ro.timestamp = &ts; + auto it = std::unique_ptr(disk_index_transaction->GetIterator(ro)); + + const std::string label_property_prefix = utils::SerializeIdType(label) + "|" + utils::SerializeIdType(property); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + if (key.starts_with(label_property_prefix)) { + storage::Gid gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key))); + if (ObjectExistsInCache(cache_accessor, gid)) continue; + + std::vector labels_id{utils::DeserializeLabelsFromLabelPropertyIndexStorage(key, value)}; + PropertyStore properties{utils::DeserializePropertiesFromLabelPropertyIndexStorage(value)}; + CreateVertexFromDisk(cache_accessor, gid, std::move(labels_id), std::move(properties), + CreateDeleteDeserializedObjectDelta(&transaction_, std::move(key), deserializeTimestamp)); + } + } +} + +VerticesIterable DiskStorage::DiskAccessor::Vertices(View view) { + auto *disk_storage = static_cast(storage_); + if (disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE) { + HandleMainLoadingForEdgeImportCache(); + + return VerticesIterable(AllVerticesIterable(disk_storage->edge_import_mode_cache_->AccessToVertices(), + &transaction_, view, &storage_->indices_, &storage_->constraints_, + storage_->config_.items)); + } + if (scanned_all_vertices_) { + return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_, + &storage_->constraints_, storage_->config_.items)); + } + + LoadVerticesToMainMemoryCache(); scanned_all_vertices_ = true; return VerticesIterable(AllVerticesIterable(vertices_.access(), &transaction_, view, &storage_->indices_, &storage_->constraints_, storage_->config_.items)); } VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, View view) { + auto *disk_storage = static_cast(storage_); + + if (disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE) { + HandleLoadingLabelForEdgeImportCache(label); + + return VerticesIterable( + disk_storage->edge_import_mode_cache_->Vertices(label, view, &transaction_, &disk_storage->constraints_)); + } + index_storage_.emplace_back(std::make_unique>()); auto &indexed_vertices = index_storage_.back(); index_deltas_storage_.emplace_back(); @@ -447,60 +587,15 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, View view) { &storage_->constraints_, storage_->config_.items)); } -std::unordered_set DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelIndexCache( - LabelId label, View view, std::list &index_deltas, utils::SkipList *indexed_vertices) { - auto main_cache_acc = vertices_.access(); - std::unordered_set gids; - gids.reserve(main_cache_acc.size()); - - for (const auto &vertex : main_cache_acc) { - gids.insert(vertex.gid); - if (VertexHasLabel(vertex, label, &transaction_, view)) { - spdlog::trace("Loaded vertex with gid: {} from main index storage to label index", - utils::SerializeIdType(vertex.gid)); - uint64_t ts = utils::GetEarliestTimestamp(vertex.delta); - /// TODO: here are doing serialization and then later deserialization again -> expensive - LoadVertexToLabelIndexCache( - label, utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid), - utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties), - CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts), - indexed_vertices->access()); - } - } - return gids; -} - -void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelIndex(LabelId label, - const std::unordered_set &gids, - std::list &index_deltas, - utils::SkipList *indexed_vertices) { - auto *disk_label_index = static_cast(storage_->indices_.label_index_.get()); - auto disk_index_transaction = disk_label_index->CreateRocksDBTransaction(); - disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp); - - rocksdb::ReadOptions ro; - std::string strTs = utils::StringTimestamp(transaction_.start_timestamp); - rocksdb::Slice ts(strTs); - ro.timestamp = &ts; - auto index_it = std::unique_ptr(disk_index_transaction->GetIterator(ro)); - - const auto serialized_label = utils::SerializeIdType(label); - for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) { - std::string key = index_it->key().ToString(); - Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key))); - spdlog::trace("Loaded vertex with key: {} from label index storage", key); - if (key.starts_with(serialized_label) && !utils::Contains(gids, curr_gid)) { - // We should pass it->timestamp().ToString() instead of deserializeTimestamp - // This is hack until RocksDB will support timestamp() in WBWI iterator - LoadVertexToLabelIndexCache( - label, index_it->key().ToString(), index_it->value().ToString(), - CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp), - indexed_vertices->access()); - } - } -} - VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, View view) { + auto *disk_storage = static_cast(storage_); + if (disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE) { + HandleLoadingLabelPropertyForEdgeImportCache(label, property); + + return VerticesIterable(disk_storage->edge_import_mode_cache_->Vertices( + label, property, std::nullopt, std::nullopt, view, &transaction_, &disk_storage->constraints_)); + } + index_storage_.emplace_back(std::make_unique>()); auto &indexed_vertices = index_storage_.back(); index_deltas_storage_.emplace_back(); @@ -527,6 +622,118 @@ VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId p &storage_->constraints_, storage_->config_.items)); } +VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, const PropertyValue &value, + View view) { + auto *disk_storage = static_cast(storage_); + if (disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE) { + HandleLoadingLabelPropertyForEdgeImportCache(label, property); + + return VerticesIterable(disk_storage->edge_import_mode_cache_->Vertices( + label, property, utils::MakeBoundInclusive(value), utils::MakeBoundInclusive(value), view, &transaction_, + &disk_storage->constraints_)); + } + + index_storage_.emplace_back(std::make_unique>()); + auto &indexed_vertices = index_storage_.back(); + index_deltas_storage_.emplace_back(); + auto &index_deltas = index_deltas_storage_.back(); + + auto label_property_filter = [this, &value](const Vertex &vertex, LabelId label, PropertyId property, + View view) -> bool { + return VertexHasLabel(vertex, label, &transaction_, view) && + VertexHasEqualPropertyValue(vertex, property, value, &transaction_, view); + }; + + const auto gids = MergeVerticesFromMainCacheWithLabelPropertyIndexCache( + label, property, view, index_deltas, indexed_vertices.get(), label_property_filter); + + LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(label, property, gids, value, index_deltas, + indexed_vertices.get()); + + return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_, + &storage_->constraints_, storage_->config_.items)); +} + +VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, + const std::optional> &lower_bound, + const std::optional> &upper_bound, + View view) { + auto *disk_storage = static_cast(storage_); + if (disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE) { + HandleLoadingLabelPropertyForEdgeImportCache(label, property); + + return VerticesIterable(disk_storage->edge_import_mode_cache_->Vertices( + label, property, lower_bound, upper_bound, view, &transaction_, &disk_storage->constraints_)); + } + + index_storage_.emplace_back(std::make_unique>()); + auto &indexed_vertices = index_storage_.back(); + index_deltas_storage_.emplace_back(); + auto &index_deltas = index_deltas_storage_.back(); + + const auto gids = MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch( + label, property, view, lower_bound, upper_bound, index_deltas, indexed_vertices.get()); + + LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(label, property, gids, lower_bound, upper_bound, index_deltas, + indexed_vertices.get()); + + return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_, + &storage_->constraints_, storage_->config_.items)); +} + +/// TODO: (andi) This should probably go into some other class not the storage. All utils methods +std::unordered_set DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelIndexCache( + LabelId label, View view, std::list &index_deltas, utils::SkipList *indexed_vertices) { + auto main_cache_acc = vertices_.access(); + std::unordered_set gids; + gids.reserve(main_cache_acc.size()); + + for (const auto &vertex : main_cache_acc) { + gids.insert(vertex.gid); + if (VertexHasLabel(vertex, label, &transaction_, view)) { + spdlog::trace("Loaded vertex with gid: {} from main index storage to label index", + utils::SerializeIdType(vertex.gid)); + uint64_t ts = utils::GetEarliestTimestamp(vertex.delta); + /// TODO: here are doing serialization and then later deserialization again -> expensive + LoadVertexToLabelIndexCache(utils::SerializeVertexAsKeyForLabelIndex(label, vertex.gid), + utils::SerializeVertexAsValueForLabelIndex(label, vertex.labels, vertex.properties), + CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts), + indexed_vertices->access()); + } + } + return gids; +} + +void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelIndex(LabelId label, + const std::unordered_set &gids, + std::list &index_deltas, + utils::SkipList *indexed_vertices) { + auto *disk_label_index = static_cast(storage_->indices_.label_index_.get()); + auto disk_index_transaction = disk_label_index->CreateRocksDBTransaction(); + disk_index_transaction->SetReadTimestampForValidation(transaction_.start_timestamp); + + rocksdb::ReadOptions ro; + std::string strTs = utils::StringTimestamp(transaction_.start_timestamp); + rocksdb::Slice ts(strTs); + ro.timestamp = &ts; + auto index_it = std::unique_ptr(disk_index_transaction->GetIterator(ro)); + + const auto serialized_label = utils::SerializeIdType(label); + for (index_it->SeekToFirst(); index_it->Valid(); index_it->Next()) { + std::string key = index_it->key().ToString(); + Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelIndexStorage(key))); + spdlog::trace("Loaded vertex with key: {} from label index storage", key); + if (key.starts_with(serialized_label) && !utils::Contains(gids, curr_gid)) { + // We should pass it->timestamp().ToString() instead of "0" + // This is hack until RocksDB will support timestamp() in WBWI iterator + LoadVertexToLabelIndexCache( + index_it->key().ToString(), index_it->value().ToString(), + CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::move(key), deserializeTimestamp), + indexed_vertices->access()); + } + } +} + std::unordered_set DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCache( LabelId label, PropertyId property, View view, std::list &index_deltas, utils::SkipList *indexed_vertices, const auto &label_property_filter) { @@ -540,10 +747,9 @@ std::unordered_set DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWit if (label_property_filter(vertex, label, property, view)) { uint64_t ts = utils::GetEarliestTimestamp(vertex.delta); LoadVertexToLabelPropertyIndexCache( - label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid), + utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid), utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties), - CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts), - indexed_vertices->access()); + CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts), indexed_vertices->access()); } } @@ -572,39 +778,16 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndex(LabelId l Gid curr_gid = Gid::FromUint(std::stoull(utils::ExtractGidFromLabelPropertyIndexStorage(key))); /// TODO: optimize if (label_property_filter(key, label_property_prefix, gids, curr_gid)) { - // We should pass it->timestamp().ToString() instead of deserializeTimestamp + // We should pass it->timestamp().ToString() instead of "0" // This is hack until RocksDB will support timestamp() in WBWI iterator LoadVertexToLabelPropertyIndexCache( - label, index_it->key().ToString(), index_it->value().ToString(), - CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp), + index_it->key().ToString(), index_it->value().ToString(), + CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::move(key), deserializeTimestamp), indexed_vertices->access()); } } } -VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, const PropertyValue &value, - View view) { - index_storage_.emplace_back(std::make_unique>()); - auto &indexed_vertices = index_storage_.back(); - index_deltas_storage_.emplace_back(); - auto &index_deltas = index_deltas_storage_.back(); - - auto label_property_filter = [this, &value](const Vertex &vertex, LabelId label, PropertyId property, - View view) -> bool { - return VertexHasLabel(vertex, label, &transaction_, view) && - VertexHasEqualPropertyValue(vertex, property, value, &transaction_, view); - }; - - const auto gids = MergeVerticesFromMainCacheWithLabelPropertyIndexCache( - label, property, view, index_deltas, indexed_vertices.get(), label_property_filter); - - LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(label, property, gids, value, index_deltas, - indexed_vertices.get()); - - return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_, - &storage_->constraints_, storage_->config_.items)); -} - void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup( LabelId label, PropertyId property, const std::unordered_set &gids, const PropertyValue &value, std::list &index_deltas, utils::SkipList *indexed_vertices) { @@ -627,35 +810,16 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexWithPointV PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value); if (key.starts_with(label_property_prefix) && !utils::Contains(gids, curr_gid) && properties.IsPropertyEqual(property, value)) { - // We should pass it->timestamp().ToString() instead of deserializeTimestamp + // We should pass it->timestamp().ToString() instead of "0" // This is hack until RocksDB will support timestamp() in WBWI iterator LoadVertexToLabelPropertyIndexCache( - label, index_it->key().ToString(), index_it->value().ToString(), - CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key, deserializeTimestamp), + index_it->key().ToString(), index_it->value().ToString(), + CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::move(key), deserializeTimestamp), indexed_vertices->access()); } } } -VerticesIterable DiskStorage::DiskAccessor::Vertices(LabelId label, PropertyId property, - const std::optional> &lower_bound, - const std::optional> &upper_bound, - View view) { - index_storage_.emplace_back(std::make_unique>()); - auto &indexed_vertices = index_storage_.back(); - index_deltas_storage_.emplace_back(); - auto &index_deltas = index_deltas_storage_.back(); - - const auto gids = MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch( - label, property, view, lower_bound, upper_bound, index_deltas, indexed_vertices.get()); - - LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(label, property, gids, lower_bound, upper_bound, index_deltas, - indexed_vertices.get()); - - return VerticesIterable(AllVerticesIterable(indexed_vertices->access(), &transaction_, view, &storage_->indices_, - &storage_->constraints_, storage_->config_.items)); -} - std::unordered_set DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch( LabelId label, PropertyId property, View view, const std::optional> &lower_bound, @@ -672,10 +836,9 @@ DiskStorage::DiskAccessor::MergeVerticesFromMainCacheWithLabelPropertyIndexCache IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) { uint64_t ts = utils::GetEarliestTimestamp(vertex.delta); LoadVertexToLabelPropertyIndexCache( - label, utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid), + utils::SerializeVertexAsKeyForLabelPropertyIndex(label, property, vertex.gid), utils::SerializeVertexAsValueForLabelPropertyIndex(label, vertex.labels, vertex.properties), - CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, std::nullopt, ts), - indexed_vertices->access()); + CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::nullopt, ts), indexed_vertices->access()); } } return gids; @@ -705,16 +868,16 @@ void DiskStorage::DiskAccessor::LoadVerticesFromDiskLabelPropertyIndexForInterva /// TODO: andi this will be optimized /// TODO: couple this condition PropertyStore properties = utils::DeserializePropertiesFromLabelPropertyIndexStorage(it_value_str); - auto prop_value = properties.GetProperty(property); + PropertyValue prop_value = properties.GetProperty(property); if (!key_str.starts_with(label_property_prefix) || utils::Contains(gids, curr_gid) || !IsPropertyValueWithinInterval(prop_value, lower_bound, upper_bound)) { continue; } - // We should pass it->timestamp().ToString() instead of deserializeTimestamp + // We should pass it->timestamp().ToString() instead of "0" // This is hack until RocksDB will support timestamp() in WBWI iterator LoadVertexToLabelPropertyIndexCache( - label, index_it->key().ToString(), index_it->value().ToString(), - CreateDeleteDeserializedIndexObjectDelta(&transaction_, index_deltas, key_str, deserializeTimestamp), + index_it->key().ToString(), index_it->value().ToString(), + CreateDeleteDeserializedIndexObjectDelta(index_deltas, std::move(key_str), deserializeTimestamp), indexed_vertices->access()); } } @@ -785,6 +948,7 @@ bool DiskStorage::PersistUniqueConstraintCreation(LabelId label, const std::set< } bool DiskStorage::PersistUniqueConstraintDeletion(LabelId label, const std::set &properties) const { + /// TODO: move to rocksdb_serialization.hpp std::string entry = utils::SerializeIdType(label); for (auto property : properties) { entry += "," + utils::SerializeIdType(property); @@ -827,6 +991,26 @@ StorageInfo DiskStorage::GetInfo() const { return {vertex_count, edge_count, average_degree, utils::GetMemoryUsage(), GetDiskSpaceUsage()}; } +void DiskStorage::SetEdgeImportMode(EdgeImportMode edge_import_status) { + std::unique_lock main_guard{main_lock_}; + if (edge_import_status == edge_import_status_) { + return; + } + if (edge_import_status == EdgeImportMode::ACTIVE) { + edge_import_mode_cache_ = std::make_unique(config_); + } else { + edge_import_mode_cache_.reset(nullptr); + } + + edge_import_status_ = edge_import_status; + spdlog::trace("Edge import mode changed to: {}", EdgeImportModeToString(edge_import_status)); +} + +EdgeImportMode DiskStorage::GetEdgeImportMode() const { + std::shared_lock storage_guard_(main_lock_); + return edge_import_status_; +} + VertexAccessor DiskStorage::DiskAccessor::CreateVertex() { OOMExceptionEnabler oom_exception; auto *disk_storage = static_cast(storage_); @@ -850,9 +1034,6 @@ VertexAccessor DiskStorage::DiskAccessor::CreateVertexFromDisk(utils::SkipList &&label_ids, PropertyStore &&properties, Delta *delta) { OOMExceptionEnabler oom_exception; - auto *disk_storage = static_cast(storage_); - disk_storage->vertex_id_.store(std::max(disk_storage->vertex_id_.load(std::memory_order_acquire), gid.AsUint() + 1), - std::memory_order_release); auto [it, inserted] = accessor.insert(Vertex{gid, delta}); MG_ASSERT(inserted, "The vertex must be inserted here!"); MG_ASSERT(it != accessor.end(), "Invalid Vertex accessor!"); @@ -863,7 +1044,11 @@ VertexAccessor DiskStorage::DiskAccessor::CreateVertexFromDisk(utils::SkipList DiskStorage::DiskAccessor::FindVertex(storage::Gid gid, View view) { - auto acc = vertices_.access(); + auto *disk_storage = static_cast(storage_); + /// TODO: (andi) Abstract to a method GetActiveAccessor + auto acc = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE + ? disk_storage->edge_import_mode_cache_->AccessToVertices() + : vertices_.access(); auto vertex_it = acc.find(gid); if (vertex_it != acc.end()) { return VertexAccessor::Create(&*vertex_it, &transaction_, &storage_->indices_, &storage_->constraints_, config_, @@ -882,15 +1067,14 @@ std::optional DiskStorage::DiskAccessor::FindVertex(storage::Gid auto strTs = utils::StringTimestamp(transaction_.start_timestamp); rocksdb::Slice ts(strTs); read_opts.timestamp = &ts; - auto *disk_storage = static_cast(storage_); auto it = std::unique_ptr( disk_transaction_->GetIterator(read_opts, disk_storage->kvstore_->vertex_chandle)); for (it->SeekToFirst(); it->Valid(); it->Next()) { std::string key = it->key().ToString(); if (Gid::FromUint(std::stoull(utils::ExtractGidFromKey(key))) == gid) { - // We should pass it->timestamp().ToString() instead of deserializeTimestamp + // We should pass it->timestamp().ToString() instead of "0" // This is hack until RocksDB will support timestamp() in WBWI iterator - return LoadVertexToMainMemoryCache(key, it->value().ToString(), deserializeTimestamp); + return LoadVertexToMainMemoryCache(std::move(key), it->value().ToString(), deserializeTimestamp); } } return std::nullopt; @@ -1027,8 +1211,7 @@ void DiskStorage::DiskAccessor::PrefetchOutEdges(const VertexAccessor &vertex_ac Result DiskStorage::DiskAccessor::CreateEdgeFromDisk(const VertexAccessor *from, const VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid, const std::string_view properties, - const std::string &old_disk_key, - const std::string &read_ts) { + std::string &&old_disk_key, std::string &&read_ts) { OOMExceptionEnabler oom_exception; auto *from_vertex = from->vertex_; auto *to_vertex = to->vertex_; @@ -1036,23 +1219,31 @@ Result DiskStorage::DiskAccessor::CreateEdgeFromDisk(const VertexA if (from_vertex->deleted || to_vertex->deleted) return Error::DELETED_OBJECT; auto *disk_storage = static_cast(storage_); - disk_storage->edge_id_.store(std::max(disk_storage->edge_id_.load(std::memory_order_acquire), gid.AsUint() + 1), - std::memory_order_release); + bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE; + + if (edge_import_mode_active) { + auto import_mode_edge_cache_acc = disk_storage->edge_import_mode_cache_->AccessToEdges(); + if (auto it = import_mode_edge_cache_acc.find(gid); it != import_mode_edge_cache_acc.end()) { + return EdgeAccessor(EdgeRef(&*it), edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, + &storage_->constraints_, config_); + } + } EdgeRef edge(gid); if (config_.properties_on_edges) { - auto acc = edges_.access(); - auto *delta = CreateDeleteDeserializedObjectDelta(&transaction_, old_disk_key, read_ts); + auto acc = edge_import_mode_active ? disk_storage->edge_import_mode_cache_->AccessToEdges() : edges_.access(); + auto *delta = CreateDeleteDeserializedObjectDelta(&transaction_, std::move(old_disk_key), std::move(read_ts)); auto [it, inserted] = acc.insert(Edge(gid, delta)); - MG_ASSERT(inserted, "The edge must be inserted here!"); MG_ASSERT(it != acc.end(), "Invalid Edge accessor!"); edge = EdgeRef(&*it); - if (delta) { - delta->prev.Set(&*it); - } + delta->prev.Set(&*it); edge.ptr->properties.SetBuffer(properties); } + ModifiedEdgeInfo modified_edge(Delta::Action::DELETE_DESERIALIZED_OBJECT, from_vertex->gid, to_vertex->gid, edge_type, + edge); + transaction_.AddModifiedEdge(gid, modified_edge); + from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); @@ -1065,6 +1256,7 @@ Result DiskStorage::DiskAccessor::CreateEdgeFromDisk(const VertexA Result DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) { + OOMExceptionEnabler oom_exception; auto *from_vertex = from->vertex_; auto *to_vertex = to->vertex_; @@ -1073,8 +1265,10 @@ Result DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, auto *disk_storage = static_cast(storage_); auto gid = storage::Gid::FromUint(disk_storage->edge_id_.fetch_add(1, std::memory_order_acq_rel)); EdgeRef edge(gid); + bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE; + if (config_.properties_on_edges) { - auto acc = edges_.access(); + auto acc = edge_import_mode_active ? disk_storage->edge_import_mode_cache_->AccessToEdges() : edges_.access(); auto *delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Edge(gid, delta)); MG_ASSERT(inserted, "The edge must be inserted here!"); @@ -1083,6 +1277,9 @@ Result DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, delta->prev.Set(&*it); } + ModifiedEdgeInfo modified_edge(Delta::Action::DELETE_OBJECT, from_vertex->gid, to_vertex->gid, edge_type, edge); + transaction_.AddModifiedEdge(gid, modified_edge); + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); @@ -1135,6 +1332,8 @@ Result> DiskStorage::DiskAccessor::DeleteEdge(EdgeAc const DiskEdgeKey disk_edge_key(from_vertex->gid, to_vertex->gid, edge_type, edge_ref, config_.properties_on_edges); edges_to_delete_.emplace(disk_edge_key.GetSerializedKey()); + transaction_.RemoveModifiedEdge(edge->Gid()); + if (config_.properties_on_edges) { MG_ASSERT((op1 && op2), "Invalid database state!"); } else { @@ -1185,24 +1384,20 @@ bool DiskStorage::DiskAccessor::WriteVertexToDisk(const Vertex &vertex) { } /// TODO: at which storage naming -bool DiskStorage::DiskAccessor::WriteEdgeToDisk(const EdgeRef edge, const std::string &serializedEdgeKey) { +bool DiskStorage::DiskAccessor::WriteEdgeToDisk(const std::string &serialized_edge_key, + const std::string &serialized_edge_value) { MG_ASSERT(commit_timestamp_.has_value(), "Writing vertex to disk but commit timestamp not set."); auto *disk_storage = static_cast(storage_); - rocksdb::Status status; - if (config_.properties_on_edges) { - status = disk_transaction_->Put(disk_storage->kvstore_->edge_chandle, serializedEdgeKey, - utils::SerializeProperties(edge.ptr->properties)); - } else { - status = disk_transaction_->Put(disk_storage->kvstore_->edge_chandle, serializedEdgeKey, ""); - } + rocksdb::Status status = + disk_transaction_->Put(disk_storage->kvstore_->edge_chandle, serialized_edge_key, serialized_edge_value); if (status.ok()) { - spdlog::trace("rocksdb: Saved edge with key {} and ts {}", serializedEdgeKey, *commit_timestamp_); + spdlog::trace("rocksdb: Saved edge with key {} and ts {}", serialized_edge_key, *commit_timestamp_); } else if (status.IsBusy()) { spdlog::error("rocksdb: Edge with key {} and ts {} was changed and committed in another transaction", - serializedEdgeKey, *commit_timestamp_); + serialized_edge_key, *commit_timestamp_); return false; } else { - spdlog::error("rocksdb: Failed to save edge with key {} and ts {}", serializedEdgeKey, *commit_timestamp_); + spdlog::error("rocksdb: Failed to save edge with key {} and ts {}", serialized_edge_key, *commit_timestamp_); return false; } return true; @@ -1255,72 +1450,84 @@ DiskStorage::DiskAccessor::CheckVertexConstraintsBeforeCommit( return {}; } -[[nodiscard]] utils::BasicResult DiskStorage::DiskAccessor::FlushMainMemoryCache() { - auto vertex_acc = vertices_.access(); - - std::vector> unique_storage; +[[nodiscard]] utils::BasicResult DiskStorage::DiskAccessor::FlushVertices( + const auto &vertex_acc, std::vector> &unique_storage) { auto *disk_unique_constraints = static_cast(storage_->constraints_.unique_constraints_.get()); auto *disk_label_index = static_cast(storage_->indices_.label_index_.get()); auto *disk_label_property_index = static_cast(storage_->indices_.label_property_index_.get()); - /// TODO: andi I don't like that std::optional is used for checking errors but that's how it was before, refactor! - for (Vertex &vertex : vertex_acc) { - if (VertexNeedsToBeSerialized(vertex)) { - if (auto check_result = CheckVertexConstraintsBeforeCommit(vertex, unique_storage); check_result.HasError()) { - return check_result.GetError(); - } + for (const Vertex &vertex : vertex_acc) { + if (!VertexNeedsToBeSerialized(vertex)) { + continue; + } + if (auto check_result = CheckVertexConstraintsBeforeCommit(vertex, unique_storage); check_result.HasError()) { + return check_result.GetError(); + } - /// TODO: what if something is changed and then deleted and how does this work connected to indices and - /// constraints - if (vertex.deleted) { - continue; - } + if (vertex.deleted) { + continue; + } - /// TODO: expose temporal coupling - /// NOTE: this deletion has to come before writing, otherwise RocksDB thinks that all entries are deleted - /// TODO: This has to deal with index storage if read from index cache - if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(vertex.delta); maybe_old_disk_key.has_value()) { - if (!DeleteVertexFromDisk(maybe_old_disk_key.value())) { - return StorageDataManipulationError{SerializationError{}}; - } - } - - if (!WriteVertexToDisk(vertex)) { - return StorageDataManipulationError{SerializationError{}}; - } - - if (!disk_unique_constraints->SyncVertexToUniqueConstraintsStorage(vertex, *commit_timestamp_) || - !disk_label_index->SyncVertexToLabelIndexStorage(vertex, *commit_timestamp_) || - !disk_label_property_index->SyncVertexToLabelPropertyIndexStorage(vertex, *commit_timestamp_)) { + /// NOTE: this deletion has to come before writing, otherwise RocksDB thinks that all entries are deleted + if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(vertex.delta); maybe_old_disk_key.has_value()) { + if (!DeleteVertexFromDisk(maybe_old_disk_key.value())) { return StorageDataManipulationError{SerializationError{}}; } } - for (auto &edge_entry : vertex.out_edges) { - EdgeRef edge = std::get<2>(edge_entry); - const DiskEdgeKey src_dest_key(vertex.gid, std::get<1>(edge_entry)->gid, std::get<0>(edge_entry), edge, - config_.properties_on_edges); + if (!WriteVertexToDisk(vertex)) { + return StorageDataManipulationError{SerializationError{}}; + } - /// TODO: expose temporal coupling - /// NOTE: this deletion has to come before writing, otherwise RocksDB thinks that all entries are deleted - if (config_.properties_on_edges) { - if (auto maybe_old_disk_key = utils::GetOldDiskKeyOrNull(edge.ptr->delta); maybe_old_disk_key.has_value()) { - if (!DeleteEdgeFromDisk(maybe_old_disk_key.value())) { - return StorageDataManipulationError{SerializationError{}}; - } - } - } - - if (!WriteEdgeToDisk(edge, src_dest_key.GetSerializedKey())) { - return StorageDataManipulationError{SerializationError{}}; - } - - /// TODO: what if edge has already been deleted + if (!disk_unique_constraints->SyncVertexToUniqueConstraintsStorage(vertex, *commit_timestamp_) || + !disk_label_index->SyncVertexToLabelIndexStorage(vertex, *commit_timestamp_) || + !disk_label_property_index->SyncVertexToLabelPropertyIndexStorage(vertex, *commit_timestamp_)) { + return StorageDataManipulationError{SerializationError{}}; } } + return {}; +} + +[[nodiscard]] utils::BasicResult +DiskStorage::DiskAccessor::ClearDanglingVertices() { + auto *disk_unique_constraints = + static_cast(storage_->constraints_.unique_constraints_.get()); + auto *disk_label_index = static_cast(storage_->indices_.label_index_.get()); + auto *disk_label_property_index = + static_cast(storage_->indices_.label_property_index_.get()); + + if (!disk_unique_constraints->DeleteVerticesWithRemovedConstraintLabel(transaction_.start_timestamp, + *commit_timestamp_) || + !disk_label_index->DeleteVerticesWithRemovedIndexingLabel(transaction_.start_timestamp, *commit_timestamp_) || + !disk_label_property_index->DeleteVerticesWithRemovedIndexingLabel(transaction_.start_timestamp, + *commit_timestamp_)) { + return StorageDataManipulationError{SerializationError{}}; + } + return {}; +} + +[[nodiscard]] utils::BasicResult DiskStorage::DiskAccessor::FlushIndexCache() { + std::vector> unique_storage; + + for (const auto &vec : index_storage_) { + if (auto vertices_res = FlushVertices(vec->access(), unique_storage); vertices_res.HasError()) { + return vertices_res.GetError(); + } + } + + return {}; +} + +[[nodiscard]] utils::BasicResult DiskStorage::DiskAccessor::FlushDeletedVertices() { + auto *disk_unique_constraints = + static_cast(storage_->constraints_.unique_constraints_.get()); + auto *disk_label_index = static_cast(storage_->indices_.label_index_.get()); + auto *disk_label_property_index = + static_cast(storage_->indices_.label_property_index_.get()); + for (const auto &[vertex_gid, serialized_vertex_to_delete] : vertices_to_delete_) { if (!DeleteVertexFromDisk(serialized_vertex_to_delete) || !disk_unique_constraints->ClearDeletedVertex(vertex_gid, *commit_timestamp_) || @@ -1330,69 +1537,48 @@ DiskStorage::DiskAccessor::CheckVertexConstraintsBeforeCommit( } } + return {}; +} + +[[nodiscard]] utils::BasicResult DiskStorage::DiskAccessor::FlushDeletedEdges() { for (const auto &edge_to_delete : edges_to_delete_) { if (!DeleteEdgeFromDisk(edge_to_delete)) { return StorageDataManipulationError{SerializationError{}}; } } - - if (!disk_unique_constraints->DeleteVerticesWithRemovedConstraintLabel(transaction_.start_timestamp, - *commit_timestamp_) || - !disk_label_index->DeleteVerticesWithRemovedIndexingLabel(transaction_.start_timestamp, *commit_timestamp_) || - !disk_label_property_index->DeleteVerticesWithRemovedIndexingLabel(transaction_.start_timestamp, - *commit_timestamp_)) { - return StorageDataManipulationError{SerializationError{}}; - } - return {}; } -/// TODO: I think unique_storage is not needed here -[[nodiscard]] utils::BasicResult DiskStorage::DiskAccessor::FlushIndexCache() { - std::vector> unique_storage; - auto *disk_unique_constraints = - static_cast(storage_->constraints_.unique_constraints_.get()); - auto *disk_label_index = static_cast(storage_->indices_.label_index_.get()); - auto *disk_label_property_index = - static_cast(storage_->indices_.label_property_index_.get()); +[[nodiscard]] utils::BasicResult DiskStorage::DiskAccessor::FlushModifiedEdges( + const auto &edge_acc) { + for (const auto &modified_edge : transaction_.modified_edges_) { + const storage::Gid &gid = modified_edge.first; + const Delta::Action action = modified_edge.second.delta_action; + DiskEdgeKey disk_edge_key(modified_edge.second, config_.properties_on_edges); + const std::string &ser_edge_key = disk_edge_key.GetSerializedKey(); - for (const auto &vec : index_storage_) { - auto vertex_acc = vec->access(); - for (Vertex &vertex : vertex_acc) { - if (auto check_result = CheckVertexConstraintsBeforeCommit(vertex, unique_storage); check_result.HasError()) { - return check_result.GetError(); + if (!config_.properties_on_edges) { + /// If the object was created then flush it, otherwise since properties on edges are false + /// edge wasn't modified for sure. + if (action == Delta::Action::DELETE_OBJECT && !WriteEdgeToDisk(ser_edge_key, "")) { + return StorageDataManipulationError{SerializationError{}}; } - - /// TODO: what if something is changed and then deleted - if (vertex.deleted) { - continue; - } - - if (!WriteVertexToDisk(vertex)) { + } else { + // If the delta is DELETE_OBJECT, the edge is just created so there is nothing to delete. + // If the edge was deserialized, only properties can be modified -> key stays the same as when deserialized + // so we can delete it. + if (action == Delta::Action::DELETE_DESERIALIZED_OBJECT && !DeleteEdgeFromDisk(ser_edge_key)) { return StorageDataManipulationError{SerializationError{}}; } - /// TODO: andi don't ignore the return value - if (!disk_unique_constraints->SyncVertexToUniqueConstraintsStorage(vertex, *commit_timestamp_) || - !disk_label_index->SyncVertexToLabelIndexStorage(vertex, *commit_timestamp_) || - !disk_label_property_index->SyncVertexToLabelPropertyIndexStorage(vertex, *commit_timestamp_)) { + const auto &edge = edge_acc.find(gid); + MG_ASSERT(edge != edge_acc.end(), + "Database in invalid state, commit not possible! Please restart your DB and start the import again."); + if (!WriteEdgeToDisk(ser_edge_key, utils::SerializeProperties(edge->properties))) { return StorageDataManipulationError{SerializationError{}}; } - - for (auto &edge_entry : vertex.out_edges) { - EdgeRef edge = std::get<2>(edge_entry); - DiskEdgeKey src_dest_key(vertex.gid, std::get<1>(edge_entry)->gid, std::get<0>(edge_entry), edge, - config_.properties_on_edges); - - if (!WriteEdgeToDisk(edge, src_dest_key.GetSerializedKey())) { - return StorageDataManipulationError{SerializationError{}}; - } - - /// TODO: what if edge has already been deleted - } } } - return {}; } @@ -1450,22 +1636,53 @@ utils::BasicResult DiskStorage::DiskAccessor MG_ASSERT(!transaction_.must_abort, "The transaction can't be committed!"); auto *disk_storage = static_cast(storage_); + bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE; if (transaction_.deltas.empty() || - std::all_of(transaction_.deltas.begin(), transaction_.deltas.end(), - [](const Delta &delta) { return delta.action == Delta::Action::DELETE_DESERIALIZED_OBJECT; })) { + (!edge_import_mode_active && + std::all_of(transaction_.deltas.begin(), transaction_.deltas.end(), + [](const Delta &delta) { return delta.action == Delta::Action::DELETE_DESERIALIZED_OBJECT; }))) { } else { std::unique_lock engine_guard(storage_->engine_lock_); commit_timestamp_.emplace(disk_storage->CommitTimestamp(desired_commit_timestamp)); + transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); - if (auto res = FlushMainMemoryCache(); res.HasError()) { - Abort(); - return res; - } + if (edge_import_mode_active) { + if (auto res = FlushModifiedEdges(disk_storage->edge_import_mode_cache_->AccessToEdges()); res.HasError()) { + Abort(); + return res; + } + } else { + std::vector> unique_storage; + if (auto vertices_flush_res = FlushVertices(vertices_.access(), unique_storage); vertices_flush_res.HasError()) { + Abort(); + return vertices_flush_res.GetError(); + } - if (auto res = FlushIndexCache(); res.HasError()) { - Abort(); - return res; + if (auto modified_edges_res = FlushModifiedEdges(edges_.access()); modified_edges_res.HasError()) { + Abort(); + return modified_edges_res.GetError(); + } + + if (auto del_vertices_res = FlushDeletedVertices(); del_vertices_res.HasError()) { + Abort(); + return del_vertices_res.GetError(); + } + + if (auto del_edges_res = FlushDeletedEdges(); del_edges_res.HasError()) { + Abort(); + return del_edges_res.GetError(); + } + + if (auto clear_dangling_res = ClearDanglingVertices(); clear_dangling_res.HasError()) { + Abort(); + return clear_dangling_res.GetError(); + } + + if (auto index_flush_res = FlushIndexCache(); index_flush_res.HasError()) { + Abort(); + return index_flush_res.GetError(); + } } } @@ -1609,7 +1826,16 @@ void DiskStorage::DiskAccessor::Abort() { } void DiskStorage::DiskAccessor::FinalizeTransaction() { + /// TODO: (andi) Check the login in InMemoryStorage. if (commit_timestamp_) { + auto *disk_storage = static_cast(storage_); + bool edge_import_mode_active = disk_storage->edge_import_status_ == EdgeImportMode::ACTIVE; + + if (edge_import_mode_active) { + auto &committed_transactions = disk_storage->edge_import_mode_cache_->GetCommittedTransactions(); + committed_transactions.WithLock( + [&](auto &committed_txs) { committed_txs.emplace_back(std::move(transaction_)); }); + } commit_timestamp_.reset(); } } @@ -1773,13 +1999,16 @@ Transaction DiskStorage::CreateTransaction(IsolationLevel isolation_level, Stora /// `timestamp`) below. uint64_t transaction_id = 0; uint64_t start_timestamp = 0; + bool edge_import_mode_active{false}; { std::lock_guard guard(engine_lock_); transaction_id = transaction_id_++; /// TODO: when we introduce replication to the disk storage, take care of start_timestamp start_timestamp = timestamp_++; + edge_import_mode_active = edge_import_status_ == EdgeImportMode::ACTIVE; } - return {transaction_id, start_timestamp, isolation_level, storage_mode}; + + return {transaction_id, start_timestamp, isolation_level, storage_mode, edge_import_mode_active}; } uint64_t DiskStorage::CommitTimestamp(const std::optional desired_commit_timestamp) { diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index b01849b12..1a15d2340 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -13,7 +13,9 @@ #include "kvstore/kvstore.hpp" #include "storage/v2/constraints/constraint_violation.hpp" +#include "storage/v2/disk/edge_import_mode_cache.hpp" #include "storage/v2/disk/rocksdb_storage.hpp" +#include "storage/v2/edge_import_mode.hpp" #include "storage/v2/id_types.hpp" #include "storage/v2/isolation_level.hpp" #include "storage/v2/property_store.hpp" @@ -44,6 +46,55 @@ class DiskStorage final : public Storage { explicit DiskAccessor(DiskStorage *storage, IsolationLevel isolation_level, StorageMode storage_mode); + /// TODO: const methods? + void LoadVerticesToMainMemoryCache(); + + void LoadVerticesFromMainStorageToEdgeImportCache(); + + void HandleMainLoadingForEdgeImportCache(); + + void LoadVerticesFromLabelIndexStorageToEdgeImportCache(LabelId label); + + void HandleLoadingLabelForEdgeImportCache(LabelId label); + + void LoadVerticesFromLabelPropertyIndexStorageToEdgeImportCache(LabelId label, PropertyId property); + + void HandleLoadingLabelPropertyForEdgeImportCache(LabelId label, PropertyId property); + + std::unordered_set MergeVerticesFromMainCacheWithLabelIndexCache(LabelId label, View view, + std::list &index_deltas, + utils::SkipList *indexed_vertices); + + void LoadVerticesFromDiskLabelIndex(LabelId label, const std::unordered_set &gids, + std::list &index_deltas, utils::SkipList *indexed_vertices); + + std::unordered_set MergeVerticesFromMainCacheWithLabelPropertyIndexCache( + LabelId label, PropertyId property, View view, std::list &index_deltas, + utils::SkipList *indexed_vertices, const auto &label_property_filter); + + void LoadVerticesFromDiskLabelPropertyIndex(LabelId label, PropertyId property, + const std::unordered_set &gids, + std::list &index_deltas, + utils::SkipList *indexed_vertices, + const auto &label_property_filter); + + void LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(LabelId label, PropertyId property, + const std::unordered_set &gids, + const PropertyValue &value, + std::list &index_deltas, + utils::SkipList *indexed_vertices); + + std::unordered_set MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch( + LabelId label, PropertyId property, View view, const std::optional> &lower_bound, + const std::optional> &upper_bound, std::list &index_deltas, + utils::SkipList *indexed_vertices); + + void LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch( + LabelId label, PropertyId property, const std::unordered_set &gids, + const std::optional> &lower_bound, + const std::optional> &upper_bound, std::list &index_deltas, + utils::SkipList *indexed_vertices); + public: DiskAccessor(const DiskAccessor &) = delete; DiskAccessor &operator=(const DiskAccessor &) = delete; @@ -61,48 +112,14 @@ class DiskStorage final : public Storage { VerticesIterable Vertices(LabelId label, View view) override; - std::unordered_set MergeVerticesFromMainCacheWithLabelIndexCache(LabelId label, View view, - std::list &index_deltas, - utils::SkipList *indexed_vertices); - - void LoadVerticesFromDiskLabelIndex(LabelId label, const std::unordered_set &gids, - std::list &index_deltas, utils::SkipList *indexed_vertices); - VerticesIterable Vertices(LabelId label, PropertyId property, View view) override; - std::unordered_set MergeVerticesFromMainCacheWithLabelPropertyIndexCache( - LabelId label, PropertyId property, View view, std::list &index_deltas, - utils::SkipList *indexed_vertices, const auto &label_property_filter); - - void LoadVerticesFromDiskLabelPropertyIndex(LabelId label, PropertyId property, - const std::unordered_set &gids, - std::list &index_deltas, - utils::SkipList *indexed_vertices, - const auto &label_property_filter); - VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view) override; - void LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(LabelId label, PropertyId property, - const std::unordered_set &gids, - const PropertyValue &value, - std::list &index_deltas, - utils::SkipList *indexed_vertices); - VerticesIterable Vertices(LabelId label, PropertyId property, const std::optional> &lower_bound, const std::optional> &upper_bound, View view) override; - std::unordered_set MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch( - LabelId label, PropertyId property, View view, const std::optional> &lower_bound, - const std::optional> &upper_bound, std::list &index_deltas, - utils::SkipList *indexed_vertices); - - void LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch( - LabelId label, PropertyId property, const std::unordered_set &gids, - const std::optional> &lower_bound, - const std::optional> &upper_bound, std::list &index_deltas, - utils::SkipList *indexed_vertices); - uint64_t ApproximateVertexCount() const override; uint64_t ApproximateVertexCount(LabelId /*label*/) const override { return 10; } @@ -200,14 +217,13 @@ class DiskStorage final : public Storage { void FinalizeTransaction() override; std::optional LoadVertexToLabelIndexCache( - LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta, + std::string &&key, std::string &&value, Delta *index_delta, utils::SkipList::Accessor index_accessor); - std::optional LoadVertexToMainMemoryCache(const std::string &key, const std::string &value, - const std::string &ts); - + std::optional LoadVertexToMainMemoryCache(std::string &&key, std::string &&value, + std::string &&ts); std::optional LoadVertexToLabelPropertyIndexCache( - LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta, + std::string &&key, std::string &&value, Delta *index_delta, utils::SkipList::Accessor index_accessor); std::optional DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value, @@ -222,31 +238,41 @@ class DiskStorage final : public Storage { void PrefetchEdges(const VertexAccessor &vertex_acc, EdgeDirection edge_direction); Result CreateEdgeFromDisk(const VertexAccessor *from, const VertexAccessor *to, EdgeTypeId edge_type, - storage::Gid gid, std::string_view properties, - const std::string &old_disk_key, const std::string &ts); + storage::Gid gid, std::string_view properties, std::string &&old_disk_key, + std::string &&ts); /// Flushes vertices and edges to the disk with the commit timestamp. /// At the time of calling, the commit_timestamp_ must already exist. /// After this method, the vertex and edge caches are cleared. - [[nodiscard]] utils::BasicResult FlushMainMemoryCache(); [[nodiscard]] utils::BasicResult FlushIndexCache(); + [[nodiscard]] utils::BasicResult FlushDeletedVertices(); + + [[nodiscard]] utils::BasicResult FlushDeletedEdges(); + + [[nodiscard]] utils::BasicResult FlushVertices( + const auto &vertex_acc, std::vector> &unique_storage); + + [[nodiscard]] utils::BasicResult FlushModifiedEdges(const auto &edge_acc); + + [[nodiscard]] utils::BasicResult ClearDanglingVertices(); + [[nodiscard]] utils::BasicResult CheckVertexConstraintsBeforeCommit( const Vertex &vertex, std::vector> &unique_storage) const; bool WriteVertexToDisk(const Vertex &vertex); - bool WriteEdgeToDisk(EdgeRef edge, const std::string &serializedEdgeKey); + bool WriteEdgeToDisk(const std::string &serialized_edge_key, const std::string &serialized_edge_value); bool DeleteVertexFromDisk(const std::string &vertex); bool DeleteEdgeFromDisk(const std::string &edge); /// Main storage - utils::SkipList vertices_; - std::vector>> index_storage_; + utils::SkipList vertices_; + std::vector>> index_storage_; /// We need them because query context for indexed reading is cleared after the query is done not after the /// transaction is done std::vector> index_deltas_storage_; - utils::SkipList edges_; + utils::SkipList edges_; Config::Items config_; std::unordered_set edges_to_delete_; std::vector> vertices_to_delete_; @@ -290,6 +316,10 @@ class DiskStorage final : public Storage { Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) override; + void SetEdgeImportMode(EdgeImportMode edge_import_status); + + EdgeImportMode GetEdgeImportMode() const; + private: void LoadIndexInfoIfExists() const; @@ -339,9 +369,12 @@ class DiskStorage final : public Storage { void FreeMemory(std::unique_lock /*lock*/) override {} + void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); } + uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); - void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); } + EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE}; + std::unique_ptr edge_import_mode_cache_{nullptr}; auto CreateReplicationClient(std::string name, io::network::Endpoint endpoint, replication::ReplicationMode mode, const replication::ReplicationClientConfig &config) diff --git a/src/storage/v2/edge_accessor.cpp b/src/storage/v2/edge_accessor.cpp index ddd03417a..75468e158 100644 --- a/src/storage/v2/edge_accessor.cpp +++ b/src/storage/v2/edge_accessor.cpp @@ -126,6 +126,11 @@ Result EdgeAccessor::SetProperty(PropertyId property, co CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value); edge_.ptr->properties.SetProperty(property, value); + if (transaction_->IsDiskStorage()) { + ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_); + transaction_->AddModifiedEdge(Gid(), modified_edge); + } + return std::move(current_value); } diff --git a/src/storage/v2/edge_accessor.hpp b/src/storage/v2/edge_accessor.hpp index 2b87b9838..91ffd07e8 100644 --- a/src/storage/v2/edge_accessor.hpp +++ b/src/storage/v2/edge_accessor.hpp @@ -79,9 +79,8 @@ class EdgeAccessor final { Gid Gid() const noexcept { if (config_.properties_on_edges) { return edge_.ptr->gid; - } else { - return edge_.gid; } + return edge_.gid; } bool IsCycle() const { return from_vertex_ == to_vertex_; } diff --git a/src/storage/v2/edge_import_mode.hpp b/src/storage/v2/edge_import_mode.hpp new file mode 100644 index 000000000..13a4812ac --- /dev/null +++ b/src/storage/v2/edge_import_mode.hpp @@ -0,0 +1,28 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include + +namespace memgraph::storage { + +enum class EdgeImportMode : std::uint8_t { ACTIVE, INACTIVE }; + +constexpr const char *EdgeImportModeToString(memgraph::storage::EdgeImportMode edge_import_mode) { + if (edge_import_mode == EdgeImportMode::INACTIVE) { + return "INACTIVE"; + } + return "ACTIVE"; +} + +} // namespace memgraph::storage diff --git a/src/storage/v2/indices/indices.cpp b/src/storage/v2/indices/indices.cpp index 44741463d..39010f47c 100644 --- a/src/storage/v2/indices/indices.cpp +++ b/src/storage/v2/indices/indices.cpp @@ -38,14 +38,14 @@ void Indices::UpdateOnSetProperty(PropertyId property, const PropertyValue &valu label_property_index_->UpdateOnSetProperty(property, value, vertex, tx); } -Indices::Indices(Constraints *constraints, const Config &config, StorageMode storage_mode) { - std::invoke([this, constraints, config, storage_mode]() { +Indices::Indices(const Config &config, StorageMode storage_mode) { + std::invoke([this, config, storage_mode]() { if (storage_mode == StorageMode::IN_MEMORY_TRANSACTIONAL || storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { - label_index_ = std::make_unique(this, constraints, config); - label_property_index_ = std::make_unique(this, constraints, config); + label_index_ = std::make_unique(this, config); + label_property_index_ = std::make_unique(this, config); } else { - label_index_ = std::make_unique(this, constraints, config); - label_property_index_ = std::make_unique(this, constraints, config); + label_index_ = std::make_unique(this, config); + label_property_index_ = std::make_unique(this, config); } }); } diff --git a/src/storage/v2/indices/indices.hpp b/src/storage/v2/indices/indices.hpp index c03ab96f4..9a71107cd 100644 --- a/src/storage/v2/indices/indices.hpp +++ b/src/storage/v2/indices/indices.hpp @@ -19,7 +19,7 @@ namespace memgraph::storage { struct Indices { - Indices(Constraints *constraints, const Config &config, StorageMode storage_mode); + Indices(const Config &config, StorageMode storage_mode); Indices(const Indices &) = delete; Indices(Indices &&) = delete; diff --git a/src/storage/v2/indices/label_index.hpp b/src/storage/v2/indices/label_index.hpp index 977e4e7a7..c9e131f49 100644 --- a/src/storage/v2/indices/label_index.hpp +++ b/src/storage/v2/indices/label_index.hpp @@ -19,8 +19,7 @@ namespace memgraph::storage { class LabelIndex { public: - LabelIndex(Indices *indices, Constraints *constraints, const Config &config) - : indices_(indices), constraints_(constraints), config_(config) {} + LabelIndex(Indices *indices, const Config &config) : indices_(indices), config_(config) {} LabelIndex(const LabelIndex &) = delete; LabelIndex(LabelIndex &&) = delete; @@ -44,7 +43,6 @@ class LabelIndex { protected: /// TODO: andi maybe no need for have those in abstract class if disk storage isn't using it Indices *indices_; - Constraints *constraints_; Config config_; }; diff --git a/src/storage/v2/indices/label_property_index.hpp b/src/storage/v2/indices/label_property_index.hpp index 5d9ed90e3..1db76eb3e 100644 --- a/src/storage/v2/indices/label_property_index.hpp +++ b/src/storage/v2/indices/label_property_index.hpp @@ -19,8 +19,7 @@ namespace memgraph::storage { class LabelPropertyIndex { public: - LabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config) - : indices_(indices), constraints_(constraints), config_(config) {} + LabelPropertyIndex(Indices *indices, const Config &config) : indices_(indices), config_(config) {} LabelPropertyIndex(const LabelPropertyIndex &) = delete; LabelPropertyIndex(LabelPropertyIndex &&) = delete; @@ -52,7 +51,6 @@ class LabelPropertyIndex { protected: Indices *indices_; - Constraints *constraints_; Config config_; }; diff --git a/src/storage/v2/inmemory/label_index.cpp b/src/storage/v2/inmemory/label_index.cpp index d8279f555..93e051aa0 100644 --- a/src/storage/v2/inmemory/label_index.cpp +++ b/src/storage/v2/inmemory/label_index.cpp @@ -10,12 +10,12 @@ // licenses/APL.txt. #include "storage/v2/inmemory/label_index.hpp" +#include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/indices_utils.hpp" namespace memgraph::storage { -InMemoryLabelIndex::InMemoryLabelIndex(Indices *indices, Constraints *constraints, Config config) - : LabelIndex(indices, constraints, config) {} +InMemoryLabelIndex::InMemoryLabelIndex(Indices *indices, Config config) : LabelIndex(indices, config) {} void InMemoryLabelIndex::UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) { auto it = index_.find(added_label); @@ -151,10 +151,11 @@ void InMemoryLabelIndex::RunGC() { } } -InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Transaction *transaction) { +InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Transaction *transaction, + Constraints *constraints) { const auto it = index_.find(label); MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint()); - return {it->second.access(), label, view, transaction, indices_, constraints_, config_}; + return {it->second.access(), label, view, transaction, indices_, constraints, config_}; } void InMemoryLabelIndex::SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats) { diff --git a/src/storage/v2/inmemory/label_index.hpp b/src/storage/v2/inmemory/label_index.hpp index ca06ab79b..7b05e8c38 100644 --- a/src/storage/v2/inmemory/label_index.hpp +++ b/src/storage/v2/inmemory/label_index.hpp @@ -11,6 +11,7 @@ #pragma once +#include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/label_index.hpp" #include "storage/v2/vertex.hpp" @@ -37,7 +38,7 @@ class InMemoryLabelIndex : public storage::LabelIndex { }; public: - InMemoryLabelIndex(Indices *indices, Constraints *constraints, Config config); + InMemoryLabelIndex(Indices *indices, Config config); /// @throw std::bad_alloc void UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) override; @@ -99,7 +100,7 @@ class InMemoryLabelIndex : public storage::LabelIndex { void RunGC(); - Iterable Vertices(LabelId label, View view, Transaction *transaction); + Iterable Vertices(LabelId label, View view, Transaction *transaction, Constraints *constraints); void SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats); diff --git a/src/storage/v2/inmemory/label_property_index.cpp b/src/storage/v2/inmemory/label_property_index.cpp index 11ae7c2d7..c3ab165fd 100644 --- a/src/storage/v2/inmemory/label_property_index.cpp +++ b/src/storage/v2/inmemory/label_property_index.cpp @@ -10,6 +10,7 @@ // licenses/APL.txt. #include "storage/v2/inmemory/label_property_index.hpp" +#include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/indices_utils.hpp" namespace memgraph::storage { @@ -32,12 +33,13 @@ bool InMemoryLabelPropertyIndex::Entry::operator<(const PropertyValue &rhs) cons bool InMemoryLabelPropertyIndex::Entry::operator==(const PropertyValue &rhs) const { return value == rhs; } -InMemoryLabelPropertyIndex::InMemoryLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config) - : LabelPropertyIndex(indices, constraints, config) {} +InMemoryLabelPropertyIndex::InMemoryLabelPropertyIndex(Indices *indices, const Config &config) + : LabelPropertyIndex(indices, config) {} bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices, const std::optional ¶llel_exec_info) { + spdlog::trace("Vertices size when creating index: {}", vertices.size()); auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList::Accessor &vertices, std::map, utils::SkipList>::iterator it) { using IndexAccessor = decltype(it->second.access()); @@ -426,11 +428,12 @@ void InMemoryLabelPropertyIndex::RunGC() { InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices( LabelId label, PropertyId property, const std::optional> &lower_bound, - const std::optional> &upper_bound, View view, Transaction *transaction) { + const std::optional> &upper_bound, View view, Transaction *transaction, + Constraints *constraints) { auto it = index_.find({label, property}); MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint()); - return {it->second.access(), label, property, lower_bound, upper_bound, view, - transaction, indices_, constraints_, config_}; + return {it->second.access(), label, property, lower_bound, upper_bound, view, + transaction, indices_, constraints, config_}; } } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/label_property_index.hpp b/src/storage/v2/inmemory/label_property_index.hpp index 6129d4f7d..69360aa98 100644 --- a/src/storage/v2/inmemory/label_property_index.hpp +++ b/src/storage/v2/inmemory/label_property_index.hpp @@ -11,6 +11,7 @@ #pragma once +#include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/label_property_index.hpp" namespace memgraph::storage { @@ -39,7 +40,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { }; public: - InMemoryLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config); + InMemoryLabelPropertyIndex(Indices *indices, const Config &config); /// @throw std::bad_alloc bool CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices, @@ -131,7 +132,8 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { void RunGC(); Iterable Vertices(LabelId label, PropertyId property, const std::optional> &lower_bound, - const std::optional> &upper_bound, View view, Transaction *transaction); + const std::optional> &upper_bound, View view, Transaction *transaction, + Constraints *constraints); private: std::map, utils::SkipList> index_; diff --git a/src/storage/v2/inmemory/replication/replication_server.cpp b/src/storage/v2/inmemory/replication/replication_server.cpp index c7c049997..881b66505 100644 --- a/src/storage/v2/inmemory/replication/replication_server.cpp +++ b/src/storage/v2/inmemory/replication/replication_server.cpp @@ -139,10 +139,9 @@ void InMemoryReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Bu storage_->constraints_.existence_constraints_ = std::make_unique(); storage_->constraints_.unique_constraints_ = std::make_unique(); - storage_->indices_.label_index_ = - std::make_unique(&storage_->indices_, &storage_->constraints_, storage_->config_); + storage_->indices_.label_index_ = std::make_unique(&storage_->indices_, storage_->config_); storage_->indices_.label_property_index_ = - std::make_unique(&storage_->indices_, &storage_->constraints_, storage_->config_); + std::make_unique(&storage_->indices_, storage_->config_); try { spdlog::debug("Loading snapshot"); auto &epoch = diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index acb13d829..0f35c1115 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -1083,14 +1083,14 @@ InMemoryStorage::DropUniqueConstraint(LabelId label, const std::set VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, View view) { auto *mem_label_index = static_cast(storage_->indices_.label_index_.get()); - return VerticesIterable(mem_label_index->Vertices(label, view, &transaction_)); + return VerticesIterable(mem_label_index->Vertices(label, view, &transaction_, &storage_->constraints_)); } VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, PropertyId property, View view) { auto *mem_label_property_index = static_cast(storage_->indices_.label_property_index_.get()); - return VerticesIterable( - mem_label_property_index->Vertices(label, property, std::nullopt, std::nullopt, view, &transaction_)); + return VerticesIterable(mem_label_property_index->Vertices(label, property, std::nullopt, std::nullopt, view, + &transaction_, &storage_->constraints_)); } VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, PropertyId property, @@ -1098,7 +1098,8 @@ VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, Prop auto *mem_label_property_index = static_cast(storage_->indices_.label_property_index_.get()); return VerticesIterable(mem_label_property_index->Vertices(label, property, utils::MakeBoundInclusive(value), - utils::MakeBoundInclusive(value), view, &transaction_)); + utils::MakeBoundInclusive(value), view, &transaction_, + &storage_->constraints_)); } VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices( @@ -1106,8 +1107,8 @@ VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices( const std::optional> &upper_bound, View view) { auto *mem_label_property_index = static_cast(storage_->indices_.label_property_index_.get()); - return VerticesIterable( - mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, &transaction_)); + return VerticesIterable(mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, + &transaction_, &storage_->constraints_)); } Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) { @@ -1131,7 +1132,7 @@ Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, S start_timestamp = timestamp_++; } } - return {transaction_id, start_timestamp, isolation_level, storage_mode}; + return {transaction_id, start_timestamp, isolation_level, storage_mode, false}; } template diff --git a/src/storage/v2/modified_edge.hpp b/src/storage/v2/modified_edge.hpp new file mode 100644 index 000000000..17c919c67 --- /dev/null +++ b/src/storage/v2/modified_edge.hpp @@ -0,0 +1,39 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include + +#include "storage/v2/delta.hpp" +#include "storage/v2/edge_ref.hpp" +#include "storage/v2/id_types.hpp" + +namespace memgraph::storage { + +struct ModifiedEdgeInfo { + ModifiedEdgeInfo(Delta::Action delta, Gid from_vertex, Gid to_vertex, EdgeTypeId edge_type, const EdgeRef &edge) + : delta_action(delta), + src_vertex_gid(from_vertex), + dest_vertex_gid(to_vertex), + edge_type_id(edge_type), + edge_ref(edge) {} + + Delta::Action delta_action; + Gid src_vertex_gid; + Gid dest_vertex_gid; + EdgeTypeId edge_type_id; + EdgeRef edge_ref; +}; + +using ModifiedEdgesMap = std::unordered_map; + +} // namespace memgraph::storage diff --git a/src/storage/v2/mvcc.hpp b/src/storage/v2/mvcc.hpp index 37b01a2d5..9963b6761 100644 --- a/src/storage/v2/mvcc.hpp +++ b/src/storage/v2/mvcc.hpp @@ -12,11 +12,13 @@ #pragma once #include +#include #include #include "storage/v2/property_value.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/view.hpp" +#include "utils/rocksdb_serialization.hpp" namespace memgraph::storage { @@ -68,6 +70,7 @@ inline std::size_t ApplyDeltasForRead(Transaction const *transaction, const Delt // of the database. if (view == View::OLD && ts == commit_timestamp && (cid < transaction->command_id || + // This check is used for on-disk storage. The vertex is valid only if it was deserialized in this transaction. (cid == transaction->command_id && delta->action == Delta::Action::DELETE_DESERIALIZED_OBJECT))) { break; } @@ -113,23 +116,38 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { transaction->command_id); } +inline Delta *CreateDeleteObjectDelta(Transaction *transaction, std::list *deltas) { + if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { + return nullptr; + } + transaction->EnsureCommitTimestampExists(); + return &deltas->emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(), transaction->command_id); +} + /// TODO: what if in-memory analytical inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional old_disk_key, - const std::string &ts) { + std::string &&ts) { // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps + transaction->EnsureCommitTimestampExists(); return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key); } -inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list &deltas, +inline Delta *CreateDeleteDeserializedObjectDelta(std::list *deltas, std::optional old_disk_key, + std::string &&ts) { + // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps + return &deltas->emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key); +} + +inline Delta *CreateDeleteDeserializedIndexObjectDelta(std::list &deltas, std::optional old_disk_key, const uint64_t ts) { return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, old_disk_key); } /// TODO: what if in-memory analytical -inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list &deltas, +inline Delta *CreateDeleteDeserializedIndexObjectDelta(std::list &deltas, std::optional old_disk_key, const std::string &ts) { // Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps - return CreateDeleteDeserializedIndexObjectDelta(transaction, deltas, old_disk_key, std::stoull(ts)); + return CreateDeleteDeserializedIndexObjectDelta(deltas, old_disk_key, std::stoull(ts)); } /// This function creates a delta in the transaction for the object and links diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 96e0a52c1..bda814ddb 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -49,7 +49,7 @@ Storage::Storage(Config config, StorageMode storage_mode) config_(config), isolation_level_(config.transaction.isolation_level), storage_mode_(storage_mode), - indices_(&constraints_, config, storage_mode), + indices_(config, storage_mode), constraints_(config, storage_mode), id_(config.name), replication_state_(config_.durability.restore_replication_state_on_startup, @@ -99,10 +99,10 @@ void Storage::SetStorageMode(StorageMode storage_mode) { } } -IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; } - StorageMode Storage::GetStorageMode() const { return storage_mode_; } +IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; } + utils::BasicResult Storage::SetIsolationLevel(IsolationLevel isolation_level) { std::unique_lock main_guard{main_lock_}; if (storage_mode_ == storage::StorageMode::IN_MEMORY_ANALYTICAL) { diff --git a/src/storage/v2/transaction.hpp b/src/storage/v2/transaction.hpp index 33ab26e4a..1c2cb3165 100644 --- a/src/storage/v2/transaction.hpp +++ b/src/storage/v2/transaction.hpp @@ -21,6 +21,7 @@ #include "storage/v2/delta.hpp" #include "storage/v2/edge.hpp" #include "storage/v2/isolation_level.hpp" +#include "storage/v2/modified_edge.hpp" #include "storage/v2/property_value.hpp" #include "storage/v2/storage_mode.hpp" #include "storage/v2/vertex.hpp" @@ -34,13 +35,14 @@ const uint64_t kTransactionInitialId = 1ULL << 63U; struct Transaction { Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level, - StorageMode storage_mode) + StorageMode storage_mode, bool edge_import_mode_active) : transaction_id(transaction_id), start_timestamp(start_timestamp), command_id(0), must_abort(false), isolation_level(isolation_level), - storage_mode(storage_mode) {} + storage_mode(storage_mode), + edge_import_mode_active(edge_import_mode_active) {} Transaction(Transaction &&other) noexcept : transaction_id(other.transaction_id.load(std::memory_order_acquire)), @@ -51,6 +53,7 @@ struct Transaction { must_abort(other.must_abort), isolation_level(other.isolation_level), storage_mode(other.storage_mode), + edge_import_mode_active(other.edge_import_mode_active), manyDeltasCache{std::move(other.manyDeltasCache)} {} Transaction(const Transaction &) = delete; @@ -59,12 +62,22 @@ struct Transaction { ~Transaction() {} + bool IsDiskStorage() const { return storage_mode == StorageMode::ON_DISK_TRANSACTIONAL; } + /// @throw std::bad_alloc if failed to create the `commit_timestamp` void EnsureCommitTimestampExists() { if (commit_timestamp != nullptr) return; commit_timestamp = std::make_unique>(transaction_id.load(std::memory_order_relaxed)); } + void AddModifiedEdge(Gid gid, ModifiedEdgeInfo modified_edge) { + if (IsDiskStorage()) { + modified_edges_.emplace(gid, modified_edge); + } + } + + void RemoveModifiedEdge(const Gid &gid) { modified_edges_.erase(gid); } + std::atomic transaction_id; uint64_t start_timestamp; // The `Transaction` object is stack allocated, but the `commit_timestamp` @@ -77,11 +90,15 @@ struct Transaction { bool must_abort; IsolationLevel isolation_level; StorageMode storage_mode; + bool edge_import_mode_active{false}; // A cache which is consistent to the current transaction_id + command_id. // Used to speedup getting info about a vertex when there is a long delta // chain involved in rebuilding that info. mutable VertexInfoCache manyDeltasCache; + + // Store modified edges GID mapped to changed Delta and serialized edge key + ModifiedEdgesMap modified_edges_; }; inline bool operator==(const Transaction &first, const Transaction &second) { diff --git a/src/storage/v2/vertex_accessor.cpp b/src/storage/v2/vertex_accessor.cpp index fcdfdf41c..b580191ac 100644 --- a/src/storage/v2/vertex_accessor.cpp +++ b/src/storage/v2/vertex_accessor.cpp @@ -15,6 +15,7 @@ #include #include +#include "query/exceptions.hpp" #include "storage/v2/edge_accessor.hpp" #include "storage/v2/id_types.hpp" #include "storage/v2/indices/indices.hpp" @@ -89,6 +90,9 @@ bool VertexAccessor::IsVisible(View view) const { } Result VertexAccessor::AddLabel(LabelId label) { + if (transaction_->edge_import_mode_active) { + throw query::WriteVertexOperationInEdgeImportModeException(); + } utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; auto guard = std::unique_lock{vertex_->lock}; @@ -109,6 +113,9 @@ Result VertexAccessor::AddLabel(LabelId label) { /// TODO: move to after update and change naming to vertex after update Result VertexAccessor::RemoveLabel(LabelId label) { + if (transaction_->edge_import_mode_active) { + throw query::WriteVertexOperationInEdgeImportModeException(); + } auto guard = std::unique_lock{vertex_->lock}; if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; @@ -224,6 +231,10 @@ Result> VertexAccessor::Labels(View view) const { } Result VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) { + if (transaction_->edge_import_mode_active) { + throw query::WriteVertexOperationInEdgeImportModeException(); + } + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; auto guard = std::unique_lock{vertex_->lock}; @@ -249,6 +260,10 @@ Result VertexAccessor::SetProperty(PropertyId property, const Pro } Result VertexAccessor::InitProperties(const std::map &properties) { + if (transaction_->edge_import_mode_active) { + throw query::WriteVertexOperationInEdgeImportModeException(); + } + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; auto guard = std::unique_lock{vertex_->lock}; @@ -268,6 +283,10 @@ Result VertexAccessor::InitProperties(const std::map>> VertexAccessor::UpdateProperties( std::map &properties) const { + if (transaction_->edge_import_mode_active) { + throw query::WriteVertexOperationInEdgeImportModeException(); + } + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; auto guard = std::unique_lock{vertex_->lock}; @@ -287,6 +306,9 @@ Result>> Vertex } Result> VertexAccessor::ClearProperties() { + if (transaction_->edge_import_mode_active) { + throw query::WriteVertexOperationInEdgeImportModeException(); + } auto guard = std::unique_lock{vertex_->lock}; if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; diff --git a/src/utils/disk_utils.hpp b/src/utils/disk_utils.hpp index 06b673419..34bc704f6 100644 --- a/src/utils/disk_utils.hpp +++ b/src/utils/disk_utils.hpp @@ -12,6 +12,7 @@ #pragma once #include "storage/v2/delta.hpp" +#include "utils/skip_list.hpp" namespace memgraph::utils { @@ -25,6 +26,11 @@ inline std::optional GetOldDiskKeyOrNull(storage::Delta *head) { return std::nullopt; } +template +inline bool ObjectExistsInCache(TSkipListAccessor &accessor, storage::Gid gid) { + return accessor.find(gid) != accessor.end(); +} + inline uint64_t GetEarliestTimestamp(storage::Delta *head) { if (head == nullptr) return 0; while (head->next != nullptr) { diff --git a/src/utils/rocksdb_serialization.hpp b/src/utils/rocksdb_serialization.hpp index 288d42606..e6da9cfe2 100644 --- a/src/utils/rocksdb_serialization.hpp +++ b/src/utils/rocksdb_serialization.hpp @@ -237,13 +237,18 @@ inline std::string SerializeVertexAsValueForLabelIndex(storage::LabelId indexing return SerializeVertexAsValueForAuxiliaryStorages(indexing_label, vertex_labels, property_store); } -inline std::vector DeserializeLabelsFromIndexStorage(const std::string &value) { - std::string labels = value.substr(0, value.find('|')); - return TransformFromStringLabels(utils::Split(labels, ",")); +inline std::vector DeserializeLabelsFromIndexStorage(const std::string &key, + const std::string &value) { + std::string labels_str{GetViewOfFirstPartOfSplit(value, '|')}; + std::vector labels{TransformFromStringLabels(utils::Split(labels_str, ","))}; + std::string indexing_label = key.substr(0, key.find('|')); + labels.emplace_back(storage::LabelId::FromUint(std::stoull(indexing_label))); + return labels; } -inline std::vector DeserializeLabelsFromLabelIndexStorage(const std::string &value) { - return DeserializeLabelsFromIndexStorage(value); +inline std::vector DeserializeLabelsFromLabelIndexStorage(const std::string &key, + const std::string &value) { + return DeserializeLabelsFromIndexStorage(key, value); } inline storage::PropertyStore DeserializePropertiesFromLabelIndexStorage(const std::string &value) { @@ -272,8 +277,9 @@ inline std::string ExtractGidFromLabelPropertyIndexStorage(const std::string &ke return std::string(GetViewOfThirdPartOfSplit(key, '|')); } -inline std::vector DeserializeLabelsFromLabelPropertyIndexStorage(const std::string &value) { - return DeserializeLabelsFromIndexStorage(value); +inline std::vector DeserializeLabelsFromLabelPropertyIndexStorage(const std::string &key, + const std::string &value) { + return DeserializeLabelsFromIndexStorage(key, value); } inline storage::PropertyStore DeserializePropertiesFromLabelPropertyIndexStorage(const std::string &value) { diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index 405200f15..bad2c3f8e 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -186,6 +186,7 @@ enum class TypeId : uint64_t { AST_CALL_SUBQUERY, AST_MULTI_DATABASE_QUERY, AST_SHOW_DATABASES, + AST_EDGE_IMPORT_MODE_QUERY, // Symbol SYMBOL, }; diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt index a9abdeb10..c0b26bc3e 100644 --- a/tests/e2e/CMakeLists.txt +++ b/tests/e2e/CMakeLists.txt @@ -61,6 +61,7 @@ add_subdirectory(load_csv) add_subdirectory(init_file_flags) add_subdirectory(analytical_mode) add_subdirectory(batched_procedures) +add_subdirectory(import_mode) add_subdirectory(concurrent_query_modules) add_subdirectory(set_properties) diff --git a/tests/e2e/import_mode/CMakeLists.txt b/tests/e2e/import_mode/CMakeLists.txt new file mode 100644 index 000000000..e316b7e82 --- /dev/null +++ b/tests/e2e/import_mode/CMakeLists.txt @@ -0,0 +1,6 @@ +function(copy_import_mode_e2e_python_files FILE_NAME) + copy_e2e_python_files(import_mode ${FILE_NAME}) +endfunction() + +copy_import_mode_e2e_python_files(common.py) +copy_import_mode_e2e_python_files(test_command.py) diff --git a/tests/e2e/import_mode/common.py b/tests/e2e/import_mode/common.py new file mode 100644 index 000000000..3a166a1d5 --- /dev/null +++ b/tests/e2e/import_mode/common.py @@ -0,0 +1,25 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing + +import mgclient + + +def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: + cursor.execute(query, params) + return cursor.fetchall() + + +def connect(**kwargs) -> mgclient.Connection: + connection = mgclient.connect(host="localhost", port=7687, **kwargs) + connection.autocommit = True + return connection diff --git a/tests/e2e/import_mode/test_command.py b/tests/e2e/import_mode/test_command.py new file mode 100644 index 000000000..019d2bd28 --- /dev/null +++ b/tests/e2e/import_mode/test_command.py @@ -0,0 +1,208 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys + +import pytest +from common import connect, execute_and_fetch_all + + +def test_import_mode_disabled_for_in_memory_storages(): + cursor = connect().cursor() + with pytest.raises(Exception): + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + + +def test_import_mode_on_off(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + + +def test_creating_vertices(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2 + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") + + +def test_creating_edges(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1 + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") + + +def test_label_index_vertices_loading(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "CREATE INDEX ON :User") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_label_index_edges_creation(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE INDEX ON :User") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1 + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_label_property_index_vertices_loading(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2 + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_label_property_index_edges_creation(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2 + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1 + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_edge_deletion_in_edge_import_mode(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1 + execute_and_fetch_all(cursor, "MATCH (n)-[r:FRIENDS {id: 3}]->(m) DELETE r") + assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 0 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_modification_of_edge_properties_in_edge_import_mode(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + execute_and_fetch_all( + cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3, balance: 1000}]->(m)" + ) + assert list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN r.balance"))[0][0] == 1000 + execute_and_fetch_all(cursor, "MATCH (n)-[r:FRIENDS {id: 3}]->(m) SET r.balance = 2000") + assert list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN r.balance"))[0][0] == 2000 + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + assert list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN r.balance"))[0][0] == 2000 + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_throw_on_vertex_add_label_during_edge_import_mode(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + with pytest.raises(Exception): + execute_and_fetch_all(cursor, "MATCH (u:User {id: 1}) SET u:User:Person RETURN u") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_throw_on_vertex_remove_label_during_edge_import_mode(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User:Person {id: 1})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + with pytest.raises(Exception): + execute_and_fetch_all(cursor, "MATCH (u:User {id: 1}) REMOVE u:Person RETURN u") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_throw_on_vertex_set_property_during_edge_import_mode(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1, balance: 1000})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + with pytest.raises(Exception): + execute_and_fetch_all(cursor, "MATCH (u:User {id: 1}) SET u.balance = 2000") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_throw_on_vertex_create_vertex_during_edge_import_mode(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1, balance: 1000})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + with pytest.raises(Exception): + execute_and_fetch_all(cursor, "CREATE (m:Mother {id: 10})") + execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +def test_throw_changing_import_mode_while_in_explicit_tx(): + cursor = connect().cursor() + execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL") + execute_and_fetch_all(cursor, "CREATE (u:User {id: 1, balance: 1000})") + execute_and_fetch_all(cursor, "BEGIN") + with pytest.raises(Exception): + execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE") + execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n") + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/import_mode/workloads.yaml b/tests/e2e/import_mode/workloads.yaml new file mode 100644 index 000000000..42d131715 --- /dev/null +++ b/tests/e2e/import_mode/workloads.yaml @@ -0,0 +1,13 @@ +import_mode_cluster: &import_mode_cluster + cluster: + main: + args: ["--bolt-port", "7687", "--log-level=TRACE", "--also-log-to-stderr"] + log_file: "transaction_queue.log" + setup_queries: [] + validation_queries: [] + +workloads: + - name: "Import mode" + binary: "tests/e2e/pytest_runner.sh" + args: ["import_mode/test_command.py"] + <<: *import_mode_cluster diff --git a/tests/unit/storage_v2_wal_file.cpp b/tests/unit/storage_v2_wal_file.cpp index 6eb425042..8aeb42fc2 100644 --- a/tests/unit/storage_v2_wal_file.cpp +++ b/tests/unit/storage_v2_wal_file.cpp @@ -60,7 +60,7 @@ class DeltaGenerator final { explicit Transaction(DeltaGenerator *gen) : gen_(gen), transaction_(gen->transaction_id_++, gen->timestamp_++, memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION, - gen->storage_mode_) {} + gen->storage_mode_, false) {} public: memgraph::storage::Vertex *CreateVertex() {