Add edge import mode into the on-disk storage (#1157)

This commit is contained in:
Andi 2023-09-05 19:00:53 +02:00 committed by GitHub
parent 09fd5939da
commit b5413c6f82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1339 additions and 399 deletions

View File

@ -158,9 +158,12 @@ class ExplicitTransactionUsageException : public QueryRuntimeException {
using QueryRuntimeException::QueryRuntimeException;
};
/**
* An exception for serialization error
*/
class WriteVertexOperationInEdgeImportModeException : public QueryException {
public:
WriteVertexOperationInEdgeImportModeException()
: QueryException("Write operations on vertices are forbidden while the edge import mode is active.") {}
};
class TransactionSerializationException : public QueryException {
public:
using QueryException::QueryException;
@ -271,6 +274,12 @@ class StorageModeModificationInMulticommandTxException : public QueryException {
: QueryException("Storage mode cannot be modified in multicommand transactions.") {}
};
class EdgeImportModeModificationInMulticommandTxException : public QueryException {
public:
EdgeImportModeModificationInMulticommandTxException()
: QueryException("Edge import mode cannot be modified in multicommand transactions.") {}
};
class CreateSnapshotInMulticommandTxException final : public QueryException {
public:
CreateSnapshotInMulticommandTxException()
@ -282,6 +291,12 @@ class CreateSnapshotDisabledOnDiskStorage final : public QueryException {
CreateSnapshotDisabledOnDiskStorage() : QueryException("In the on-disk storage mode data is already persistent.") {}
};
class EdgeImportModeQueryDisabledOnDiskStorage final : public QueryException {
public:
EdgeImportModeQueryDisabledOnDiskStorage()
: QueryException("Edge import mode is only allowed for on-disk storage mode.") {}
};
class SettingConfigInMulticommandTxException final : public QueryException {
public:
SettingConfigInMulticommandTxException()

View File

@ -285,4 +285,8 @@ constexpr utils::TypeInfo query::MultiDatabaseQuery::kType{utils::TypeId::AST_MU
constexpr utils::TypeInfo query::ShowDatabasesQuery::kType{utils::TypeId::AST_SHOW_DATABASES, "ShowDatabasesQuery",
&query::Query::kType};
constexpr utils::TypeInfo query::EdgeImportModeQuery::kType{utils::TypeId::AST_EDGE_IMPORT_MODE_QUERY,
"EdgeImportModeQuery", &query::Query::kType};
} // namespace memgraph

View File

@ -3003,6 +3003,29 @@ class ReplicationQuery : public memgraph::query::Query {
friend class AstStorage;
};
class EdgeImportModeQuery : public memgraph::query::Query {
public:
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class Status { ACTIVE, INACTIVE };
EdgeImportModeQuery() = default;
DEFVISITABLE(QueryVisitor<void>);
memgraph::query::EdgeImportModeQuery::Status status_;
EdgeImportModeQuery *Clone(AstStorage *storage) const override {
auto *object = storage->Create<EdgeImportModeQuery>();
object->status_ = status_;
return object;
}
private:
friend class AstStorage;
};
class LockPathQuery : public memgraph::query::Query {
public:
static const utils::TypeInfo kType;

View File

@ -105,6 +105,7 @@ class TransactionQueueQuery;
class Exists;
class MultiDatabaseQuery;
class ShowDatabasesQuery;
class EdgeImportModeQuery;
using TreeCompositeVisitor = utils::CompositeVisitor<
SingleQuery, CypherUnion, NamedExpression, OrOperator, XorOperator, AndOperator, NotOperator, AdditionOperator,
@ -143,6 +144,6 @@ class QueryVisitor
ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, FreeMemoryQuery, TriggerQuery,
IsolationLevelQuery, CreateSnapshotQuery, StreamQuery, SettingQuery, VersionQuery,
ShowConfigQuery, TransactionQueueQuery, StorageModeQuery, AnalyzeGraphQuery,
MultiDatabaseQuery, ShowDatabasesQuery> {};
MultiDatabaseQuery, ShowDatabasesQuery, EdgeImportModeQuery> {};
} // namespace memgraph::query

View File

@ -269,6 +269,17 @@ antlrcpp::Any CypherMainVisitor::visitReplicationQuery(MemgraphCypher::Replicati
return replication_query;
}
antlrcpp::Any CypherMainVisitor::visitEdgeImportModeQuery(MemgraphCypher::EdgeImportModeQueryContext *ctx) {
auto *edge_import_mode_query = storage_->Create<EdgeImportModeQuery>();
if (ctx->ACTIVE()) {
edge_import_mode_query->status_ = EdgeImportModeQuery::Status::ACTIVE;
} else {
edge_import_mode_query->status_ = EdgeImportModeQuery::Status::INACTIVE;
}
query_ = edge_import_mode_query;
return edge_import_mode_query;
}
antlrcpp::Any CypherMainVisitor::visitSetReplicationRole(MemgraphCypher::SetReplicationRoleContext *ctx) {
auto *replication_query = storage_->Create<ReplicationQuery>();
replication_query->action_ = ReplicationQuery::Action::SET_REPLICATION_ROLE;

View File

@ -196,6 +196,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitReplicationQuery(MemgraphCypher::ReplicationQueryContext *ctx) override;
/**
* @return EdgeImportMode*
*/
antlrcpp::Any visitEdgeImportModeQuery(MemgraphCypher::EdgeImportModeQueryContext *ctx) override;
/**
* @return ReplicationQuery*
*/

View File

@ -20,6 +20,7 @@ options { tokenVocab=MemgraphCypherLexer; }
import Cypher ;
memgraphCypherKeyword : cypherKeyword
| ACTIVE
| AFTER
| ALTER
| ANALYZE
@ -48,6 +49,7 @@ memgraphCypherKeyword : cypherKeyword
| DENY
| DROP
| DUMP
| EDGE
| EDGE_TYPES
| EXECUTE
| FOR
@ -60,9 +62,11 @@ memgraphCypherKeyword : cypherKeyword
| HEADER
| IDENTIFIED
| NULLIF
| ISOLATION
| IMPORT
| INACTIVE
| IN_MEMORY_ANALYTICAL
| IN_MEMORY_TRANSACTIONAL
| ISOLATION
| KAFKA
| LABELS
| LEVEL
@ -143,6 +147,7 @@ query : cypherQuery
| transactionQueueQuery
| multiDatabaseQuery
| showDatabases
| edgeImportModeQuery
;
authQuery : createRole
@ -475,3 +480,5 @@ useDatabase : USE DATABASE databaseName ;
dropDatabase : DROP DATABASE databaseName ;
showDatabases: SHOW DATABASES ;
edgeImportModeQuery : EDGE IMPORT MODE ( ACTIVE | INACTIVE ) ;

View File

@ -23,6 +23,7 @@ lexer grammar MemgraphCypherLexer ;
import CypherLexer ;
ACTIVE : A C T I V E ;
AFTER : A F T E R ;
ALTER : A L T E R ;
ANALYZE : A N A L Y Z E ;
@ -55,6 +56,7 @@ DIRECTORY : D I R E C T O R Y ;
DROP : D R O P ;
DUMP : D U M P ;
DURABILITY : D U R A B I L I T Y ;
EDGE : E D G E ;
EDGE_TYPES : E D G E UNDERSCORE T Y P E S ;
EXECUTE : E X E C U T E ;
FOR : F O R ;
@ -69,9 +71,11 @@ GRANTS : G R A N T S ;
HEADER : H E A D E R ;
IDENTIFIED : I D E N T I F I E D ;
IGNORE : I G N O R E ;
ISOLATION : I S O L A T I O N ;
IMPORT : I M P O R T ;
INACTIVE : I N A C T I V E ;
IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ;
IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ;
ISOLATION : I S O L A T I O N ;
KAFKA : K A F K A ;
LABELS : L A B E L S ;
LEVEL : L E V E L ;

View File

@ -87,6 +87,8 @@ class PrivilegeExtractor : public QueryVisitor<void>, public HierarchicalTreeVis
void Visit(TransactionQueueQuery & /*transaction_queue_query*/) override {}
void Visit(EdgeImportModeQuery & /*edge_import_mode_query*/) override {}
void Visit(VersionQuery & /*version_query*/) override { AddPrivilege(AuthQuery::Privilege::STATS); }
void Visit(MultiDatabaseQuery &query) override {

View File

@ -218,7 +218,7 @@ const trie::Trie kKeywords = {"union",
"data",
"directory",
"lock",
"unlock"
"unlock",
"build"};
// Unicode codepoints that are allowed at the start of the unescaped name.

View File

@ -64,6 +64,7 @@
#include "spdlog/spdlog.h"
#include "storage/v2/disk/storage.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/edge_import_mode.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/property_value.hpp"
@ -2459,6 +2460,13 @@ constexpr auto ToStorageMode(const StorageModeQuery::StorageMode storage_mode) n
}
}
constexpr auto ToEdgeImportMode(const EdgeImportModeQuery::Status status) noexcept {
if (status == EdgeImportModeQuery::Status::ACTIVE) {
return storage::EdgeImportMode::ACTIVE;
}
return storage::EdgeImportMode::INACTIVE;
}
bool SwitchingFromInMemoryToDisk(storage::StorageMode current_mode, storage::StorageMode next_mode) {
return (current_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL ||
current_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL) &&
@ -2602,6 +2610,37 @@ PreparedQuery PrepareStorageModeQuery(ParsedQuery parsed_query, const bool in_ex
RWType::NONE};
}
PreparedQuery PrepareEdgeImportModeQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context) {
if (in_explicit_transaction) {
throw EdgeImportModeModificationInMulticommandTxException();
}
if (interpreter_context->db->GetStorageMode() != storage::StorageMode::ON_DISK_TRANSACTIONAL) {
throw EdgeImportModeQueryDisabledOnDiskStorage();
}
auto *edge_import_mode_query = utils::Downcast<EdgeImportModeQuery>(parsed_query.query);
MG_ASSERT(edge_import_mode_query);
const auto requested_status = ToEdgeImportMode(edge_import_mode_query->status_);
auto callback = [requested_status, interpreter_context]() -> std::function<void()> {
return [interpreter_context, requested_status] {
auto *disk_storage = static_cast<storage::DiskStorage *>(interpreter_context->db.get());
disk_storage->SetEdgeImportMode(requested_status);
};
}();
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[callback = std::move(callback)](AnyStream * /*stream*/,
std::optional<int> /*n*/) -> std::optional<QueryHandlerResult> {
callback();
return QueryHandlerResult::COMMIT;
},
RWType::NONE};
}
PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
InterpreterContext *interpreter_context) {
if (in_explicit_transaction) {
@ -3645,6 +3684,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
} else if (utils::Downcast<ShowDatabasesQuery>(parsed_query.query)) {
prepared_query =
PrepareShowDatabasesQuery(std::move(parsed_query), interpreter_context_, session_uuid, username_);
} else if (utils::Downcast<EdgeImportModeQuery>(parsed_query.query)) {
prepared_query =
PrepareEdgeImportModeQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
} else {
LOG_FATAL("Should not get here -- unknown query type!");
}

View File

@ -25,6 +25,7 @@ add_library(mg-storage-v2 STATIC
inmemory/label_index.cpp
inmemory/label_property_index.cpp
inmemory/unique_constraints.cpp
disk/edge_import_mode_cache.cpp
disk/storage.cpp
disk/rocksdb_storage.cpp
disk/label_index.cpp

View File

@ -0,0 +1,84 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/disk//edge_import_mode_cache.hpp"
#include <algorithm>
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/indices/indices.hpp"
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/transaction.hpp"
#include "utils/algorithm.hpp"
#include "utils/disk_utils.hpp"
namespace memgraph::storage {
EdgeImportModeCache::EdgeImportModeCache(const Config &config)
: in_memory_indices_(Indices(config, StorageMode::IN_MEMORY_TRANSACTIONAL)) {}
InMemoryLabelIndex::Iterable EdgeImportModeCache::Vertices(LabelId label, View view, Transaction *transaction,
Constraints *constraints) const {
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(in_memory_indices_.label_index_.get());
return mem_label_index->Vertices(label, view, transaction, constraints);
}
InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices(
LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction,
Constraints *constraints) const {
auto *mem_label_property_index =
static_cast<InMemoryLabelPropertyIndex *>(in_memory_indices_.label_property_index_.get());
return mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, transaction, constraints);
}
bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
auto *mem_label_property_index =
static_cast<InMemoryLabelPropertyIndex *>(in_memory_indices_.label_property_index_.get());
bool res = mem_label_property_index->CreateIndex(label, property, vertices_.access(), parallel_exec_info);
if (res) {
scanned_label_properties_.insert({label, property});
}
return res;
}
bool EdgeImportModeCache::CreateIndex(LabelId label,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(in_memory_indices_.label_index_.get());
bool res = mem_label_index->CreateIndex(label, vertices_.access(), parallel_exec_info);
if (res) {
scanned_labels_.insert(label);
}
return res;
}
bool EdgeImportModeCache::VerticesWithLabelPropertyScanned(LabelId label, PropertyId property) const {
return VerticesWithLabelScanned(label) || utils::Contains(scanned_label_properties_, std::make_pair(label, property));
}
bool EdgeImportModeCache::VerticesWithLabelScanned(LabelId label) const {
return AllVerticesScanned() || utils::Contains(scanned_labels_, label);
}
bool EdgeImportModeCache::AllVerticesScanned() const { return scanned_all_vertices_; }
utils::SkipList<Vertex>::Accessor EdgeImportModeCache::AccessToVertices() { return vertices_.access(); }
utils::SkipList<Edge>::Accessor EdgeImportModeCache::AccessToEdges() { return edges_.access(); }
void EdgeImportModeCache::SetScannedAllVertices() { scanned_all_vertices_ = true; }
utils::Synchronized<std::list<Transaction>, utils::SpinLock> &EdgeImportModeCache::GetCommittedTransactions() {
return committed_transactions_;
}
} // namespace memgraph::storage

View File

@ -0,0 +1,74 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "storage/v2/delta.hpp"
#include "storage/v2/disk/label_index.hpp"
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices/indices.hpp"
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/skip_list.hpp"
namespace memgraph::storage {
class EdgeImportModeCache final {
public:
explicit EdgeImportModeCache(const Config &config);
EdgeImportModeCache(const EdgeImportModeCache &) = delete;
EdgeImportModeCache &operator=(const EdgeImportModeCache &) = delete;
EdgeImportModeCache(EdgeImportModeCache &&) = delete;
EdgeImportModeCache &operator=(EdgeImportModeCache &&) = delete;
~EdgeImportModeCache() = default;
InMemoryLabelIndex::Iterable Vertices(LabelId label, View view, Transaction *transaction,
Constraints *constraints) const;
InMemoryLabelPropertyIndex::Iterable Vertices(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound,
View view, Transaction *transaction, Constraints *constraints) const;
bool CreateIndex(LabelId label, PropertyId property,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info = {});
bool CreateIndex(LabelId label, const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info = {});
bool VerticesWithLabelPropertyScanned(LabelId label, PropertyId property) const;
bool VerticesWithLabelScanned(LabelId label) const;
bool AllVerticesScanned() const;
utils::SkipList<Vertex>::Accessor AccessToVertices();
utils::SkipList<Edge>::Accessor AccessToEdges();
void SetScannedAllVertices();
utils::Synchronized<std::list<Transaction>, utils::SpinLock> &GetCommittedTransactions();
private:
utils::SkipList<Vertex> vertices_;
utils::SkipList<Edge> edges_;
Indices in_memory_indices_;
bool scanned_all_vertices_{false};
std::set<LabelId> scanned_labels_;
std::set<std::pair<LabelId, PropertyId>> scanned_label_properties_;
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
};
} // namespace memgraph::storage

View File

@ -45,8 +45,7 @@ bool CommitWithTimestamp(rocksdb::Transaction *disk_transaction, uint64_t commit
} // namespace
DiskLabelIndex::DiskLabelIndex(Indices *indices, Constraints *constraints, const Config &config)
: LabelIndex(indices, constraints, config) {
DiskLabelIndex::DiskLabelIndex(Indices *indices, const Config &config) : LabelIndex(indices, config) {
utils::EnsureDirOrDie(config.disk.label_index_directory);
kvstore_ = std::make_unique<RocksDBStorage>();
kvstore_->options_.create_if_missing = true;
@ -216,4 +215,6 @@ void DiskLabelIndex::LoadIndexInfo(const std::vector<std::string> &labels) {
RocksDBStorage *DiskLabelIndex::GetRocksDBStorage() const { return kvstore_.get(); }
std::unordered_set<LabelId> DiskLabelIndex::GetInfo() const { return index_; }
} // namespace memgraph::storage

View File

@ -9,6 +9,8 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <rocksdb/iterator.h>
#include <rocksdb/utilities/transaction.h>
@ -21,7 +23,7 @@
namespace memgraph::storage {
class DiskLabelIndex : public storage::LabelIndex {
public:
DiskLabelIndex(Indices *indices, Constraints *constraints, const Config &config);
DiskLabelIndex(Indices *indices, const Config &config);
[[nodiscard]] bool CreateIndex(LabelId label, const std::vector<std::pair<std::string, std::string>> &vertices);
@ -53,6 +55,8 @@ class DiskLabelIndex : public storage::LabelIndex {
void LoadIndexInfo(const std::vector<std::string> &labels);
std::unordered_set<LabelId> GetInfo() const;
private:
utils::Synchronized<std::map<uint64_t, std::map<Gid, std::vector<LabelId>>>> entries_for_deletion;
std::unordered_set<LabelId> index_;

View File

@ -49,8 +49,8 @@ bool CommitWithTimestamp(rocksdb::Transaction *disk_transaction, uint64_t commit
} // namespace
DiskLabelPropertyIndex::DiskLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config)
: LabelPropertyIndex(indices, constraints, config) {
DiskLabelPropertyIndex::DiskLabelPropertyIndex(Indices *indices, const Config &config)
: LabelPropertyIndex(indices, config) {
utils::EnsureDirOrDie(config.disk.label_property_index_directory);
kvstore_ = std::make_unique<RocksDBStorage>();
kvstore_->options_.create_if_missing = true;
@ -223,4 +223,6 @@ void DiskLabelPropertyIndex::LoadIndexInfo(const std::vector<std::string> &keys)
RocksDBStorage *DiskLabelPropertyIndex::GetRocksDBStorage() const { return kvstore_.get(); }
std::set<std::pair<LabelId, PropertyId>> DiskLabelPropertyIndex::GetInfo() const { return index_; }
} // namespace memgraph::storage

View File

@ -9,6 +9,8 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "storage/v2/disk/rocksdb_storage.hpp"
#include "storage/v2/indices/label_property_index.hpp"
@ -20,7 +22,7 @@ using ParallelizedIndexCreationInfo =
class DiskLabelPropertyIndex : public storage::LabelPropertyIndex {
public:
DiskLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config);
DiskLabelPropertyIndex(Indices *indices, const Config &config);
bool CreateIndex(LabelId label, PropertyId property,
const std::vector<std::pair<std::string, std::string>> &vertices);
@ -61,6 +63,8 @@ class DiskLabelPropertyIndex : public storage::LabelPropertyIndex {
void LoadIndexInfo(const std::vector<std::string> &keys);
std::set<std::pair<LabelId, PropertyId>> GetInfo() const;
private:
utils::Synchronized<std::map<uint64_t, std::map<Gid, std::vector<std::pair<LabelId, PropertyId>>>>>
entries_for_deletion;

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#include "rocksdb_storage.hpp"
#include <string_view>
#include "utils/rocksdb_serialization.hpp"
@ -80,15 +81,6 @@ int ComparatorWithU64TsImpl::CompareTimestamp(const rocksdb::Slice &ts1, const r
return 0;
}
DiskEdgeKey::DiskEdgeKey(storage::EdgeAccessor *edge_acc) {
auto from_gid = utils::SerializeIdType(edge_acc->FromVertex().Gid());
auto to_gid = utils::SerializeIdType(edge_acc->ToVertex().Gid());
auto edge_type = utils::SerializeIdType(edge_acc->EdgeType());
auto edge_gid = utils::SerializeIdType(edge_acc->Gid());
key = fmt::format("{}|{}|{}|{}|{}", from_gid, to_gid, utils::outEdgeDirection, edge_type, edge_gid);
}
DiskEdgeKey::DiskEdgeKey(storage::Gid src_vertex_gid, storage::Gid dest_vertex_gid, storage::EdgeTypeId edge_type_id,
const storage::EdgeRef edge_ref, bool properties_on_edges) {
auto from_gid = utils::SerializeIdType(src_vertex_gid);
@ -105,6 +97,10 @@ DiskEdgeKey::DiskEdgeKey(storage::Gid src_vertex_gid, storage::Gid dest_vertex_g
key = fmt::format("{}|{}|{}|{}|{}", from_gid, to_gid, utils::outEdgeDirection, edge_type, edge_gid);
}
DiskEdgeKey::DiskEdgeKey(const ModifiedEdgeInfo &edge_info, bool properties_on_edges)
: DiskEdgeKey(edge_info.src_vertex_gid, edge_info.dest_vertex_gid, edge_info.edge_type_id, edge_info.edge_ref,
properties_on_edges) {}
std::string DiskEdgeKey::GetVertexOutGid() const { return key.substr(0, key.find('|')); }
std::string DiskEdgeKey::GetVertexInGid() const {

View File

@ -18,9 +18,10 @@
#include <rocksdb/status.h>
#include <rocksdb/utilities/transaction_db.h>
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/edge_ref.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/modified_edge.hpp"
#include "storage/v2/property_store.hpp"
#include "utils/logging.hpp"
@ -79,14 +80,14 @@ class ComparatorWithU64TsImpl : public rocksdb::Comparator {
struct DiskEdgeKey {
DiskEdgeKey(const std::string_view keyView) : key(keyView) {}
DiskEdgeKey(EdgeAccessor *edge_acc);
/// @tparam src_vertex_gid, dest_vertex_gid: Gid of the source and destination vertices
/// @tparam edge_type_id: EdgeTypeId of the edge
/// @tparam edge_ref: Edge to be serialized
DiskEdgeKey(Gid src_vertex_gid, storage::Gid dest_vertex_gid, storage::EdgeTypeId edge_type_id,
const EdgeRef edge_ref, bool properties_on_edges);
DiskEdgeKey(const ModifiedEdgeInfo &edge_info, bool properties_on_edges);
std::string GetSerializedKey() const { return key; }
std::string GetVertexOutGid() const;

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,9 @@
#include "kvstore/kvstore.hpp"
#include "storage/v2/constraints/constraint_violation.hpp"
#include "storage/v2/disk/edge_import_mode_cache.hpp"
#include "storage/v2/disk/rocksdb_storage.hpp"
#include "storage/v2/edge_import_mode.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/property_store.hpp"
@ -44,6 +46,55 @@ class DiskStorage final : public Storage {
explicit DiskAccessor(DiskStorage *storage, IsolationLevel isolation_level, StorageMode storage_mode);
/// TODO: const methods?
void LoadVerticesToMainMemoryCache();
void LoadVerticesFromMainStorageToEdgeImportCache();
void HandleMainLoadingForEdgeImportCache();
void LoadVerticesFromLabelIndexStorageToEdgeImportCache(LabelId label);
void HandleLoadingLabelForEdgeImportCache(LabelId label);
void LoadVerticesFromLabelPropertyIndexStorageToEdgeImportCache(LabelId label, PropertyId property);
void HandleLoadingLabelPropertyForEdgeImportCache(LabelId label, PropertyId property);
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelIndexCache(LabelId label, View view,
std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
void LoadVerticesFromDiskLabelIndex(LabelId label, const std::unordered_set<storage::Gid> &gids,
std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices);
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
LabelId label, PropertyId property, View view, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices, const auto &label_property_filter);
void LoadVerticesFromDiskLabelPropertyIndex(LabelId label, PropertyId property,
const std::unordered_set<storage::Gid> &gids,
std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices,
const auto &label_property_filter);
void LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(LabelId label, PropertyId property,
const std::unordered_set<storage::Gid> &gids,
const PropertyValue &value,
std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch(
LabelId label, PropertyId property, View view, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
void LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(
LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
public:
DiskAccessor(const DiskAccessor &) = delete;
DiskAccessor &operator=(const DiskAccessor &) = delete;
@ -61,48 +112,14 @@ class DiskStorage final : public Storage {
VerticesIterable Vertices(LabelId label, View view) override;
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelIndexCache(LabelId label, View view,
std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
void LoadVerticesFromDiskLabelIndex(LabelId label, const std::unordered_set<storage::Gid> &gids,
std::list<Delta> &index_deltas, utils::SkipList<Vertex> *indexed_vertices);
VerticesIterable Vertices(LabelId label, PropertyId property, View view) override;
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCache(
LabelId label, PropertyId property, View view, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices, const auto &label_property_filter);
void LoadVerticesFromDiskLabelPropertyIndex(LabelId label, PropertyId property,
const std::unordered_set<storage::Gid> &gids,
std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices,
const auto &label_property_filter);
VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view) override;
void LoadVerticesFromDiskLabelPropertyIndexWithPointValueLookup(LabelId label, PropertyId property,
const std::unordered_set<storage::Gid> &gids,
const PropertyValue &value,
std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
VerticesIterable Vertices(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
std::unordered_set<Gid> MergeVerticesFromMainCacheWithLabelPropertyIndexCacheForIntervalSearch(
LabelId label, PropertyId property, View view, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
void LoadVerticesFromDiskLabelPropertyIndexForIntervalSearch(
LabelId label, PropertyId property, const std::unordered_set<storage::Gid> &gids,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, std::list<Delta> &index_deltas,
utils::SkipList<Vertex> *indexed_vertices);
uint64_t ApproximateVertexCount() const override;
uint64_t ApproximateVertexCount(LabelId /*label*/) const override { return 10; }
@ -200,14 +217,13 @@ class DiskStorage final : public Storage {
void FinalizeTransaction() override;
std::optional<storage::VertexAccessor> LoadVertexToLabelIndexCache(
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
std::string &&key, std::string &&value, Delta *index_delta,
utils::SkipList<storage::Vertex>::Accessor index_accessor);
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(const std::string &key, const std::string &value,
const std::string &ts);
std::optional<storage::VertexAccessor> LoadVertexToMainMemoryCache(std::string &&key, std::string &&value,
std::string &&ts);
std::optional<storage::VertexAccessor> LoadVertexToLabelPropertyIndexCache(
LabelId indexing_label, std::string &&key, std::string &&value, Delta *index_delta,
std::string &&key, std::string &&value, Delta *index_delta,
utils::SkipList<storage::Vertex>::Accessor index_accessor);
std::optional<storage::EdgeAccessor> DeserializeEdge(const rocksdb::Slice &key, const rocksdb::Slice &value,
@ -222,31 +238,41 @@ class DiskStorage final : public Storage {
void PrefetchEdges(const VertexAccessor &vertex_acc, EdgeDirection edge_direction);
Result<EdgeAccessor> CreateEdgeFromDisk(const VertexAccessor *from, const VertexAccessor *to, EdgeTypeId edge_type,
storage::Gid gid, std::string_view properties,
const std::string &old_disk_key, const std::string &ts);
storage::Gid gid, std::string_view properties, std::string &&old_disk_key,
std::string &&ts);
/// Flushes vertices and edges to the disk with the commit timestamp.
/// At the time of calling, the commit_timestamp_ must already exist.
/// After this method, the vertex and edge caches are cleared.
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> FlushMainMemoryCache();
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> FlushIndexCache();
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> FlushDeletedVertices();
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> FlushDeletedEdges();
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> FlushVertices(
const auto &vertex_acc, std::vector<std::vector<PropertyValue>> &unique_storage);
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> FlushModifiedEdges(const auto &edge_acc);
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> ClearDanglingVertices();
[[nodiscard]] utils::BasicResult<StorageDataManipulationError, void> CheckVertexConstraintsBeforeCommit(
const Vertex &vertex, std::vector<std::vector<PropertyValue>> &unique_storage) const;
bool WriteVertexToDisk(const Vertex &vertex);
bool WriteEdgeToDisk(EdgeRef edge, const std::string &serializedEdgeKey);
bool WriteEdgeToDisk(const std::string &serialized_edge_key, const std::string &serialized_edge_value);
bool DeleteVertexFromDisk(const std::string &vertex);
bool DeleteEdgeFromDisk(const std::string &edge);
/// Main storage
utils::SkipList<storage::Vertex> vertices_;
std::vector<std::unique_ptr<utils::SkipList<storage::Vertex>>> index_storage_;
utils::SkipList<Vertex> vertices_;
std::vector<std::unique_ptr<utils::SkipList<Vertex>>> index_storage_;
/// We need them because query context for indexed reading is cleared after the query is done not after the
/// transaction is done
std::vector<std::list<Delta>> index_deltas_storage_;
utils::SkipList<storage::Edge> edges_;
utils::SkipList<Edge> edges_;
Config::Items config_;
std::unordered_set<std::string> edges_to_delete_;
std::vector<std::pair<std::string, std::string>> vertices_to_delete_;
@ -290,6 +316,10 @@ class DiskStorage final : public Storage {
Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) override;
void SetEdgeImportMode(EdgeImportMode edge_import_status);
EdgeImportMode GetEdgeImportMode() const;
private:
void LoadIndexInfoIfExists() const;
@ -339,9 +369,12 @@ class DiskStorage final : public Storage {
void FreeMemory(std::unique_lock<utils::RWLock> /*lock*/) override {}
void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); }
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); }
EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE};
std::unique_ptr<EdgeImportModeCache> edge_import_mode_cache_{nullptr};
auto CreateReplicationClient(std::string name, io::network::Endpoint endpoint, replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)

View File

@ -126,6 +126,11 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value);
edge_.ptr->properties.SetProperty(property, value);
if (transaction_->IsDiskStorage()) {
ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_);
transaction_->AddModifiedEdge(Gid(), modified_edge);
}
return std::move(current_value);
}

View File

@ -79,9 +79,8 @@ class EdgeAccessor final {
Gid Gid() const noexcept {
if (config_.properties_on_edges) {
return edge_.ptr->gid;
} else {
return edge_.gid;
}
return edge_.gid;
}
bool IsCycle() const { return from_vertex_ == to_vertex_; }

View File

@ -0,0 +1,28 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
#include <string>
namespace memgraph::storage {
enum class EdgeImportMode : std::uint8_t { ACTIVE, INACTIVE };
constexpr const char *EdgeImportModeToString(memgraph::storage::EdgeImportMode edge_import_mode) {
if (edge_import_mode == EdgeImportMode::INACTIVE) {
return "INACTIVE";
}
return "ACTIVE";
}
} // namespace memgraph::storage

View File

@ -38,14 +38,14 @@ void Indices::UpdateOnSetProperty(PropertyId property, const PropertyValue &valu
label_property_index_->UpdateOnSetProperty(property, value, vertex, tx);
}
Indices::Indices(Constraints *constraints, const Config &config, StorageMode storage_mode) {
std::invoke([this, constraints, config, storage_mode]() {
Indices::Indices(const Config &config, StorageMode storage_mode) {
std::invoke([this, config, storage_mode]() {
if (storage_mode == StorageMode::IN_MEMORY_TRANSACTIONAL || storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
label_index_ = std::make_unique<InMemoryLabelIndex>(this, constraints, config);
label_property_index_ = std::make_unique<InMemoryLabelPropertyIndex>(this, constraints, config);
label_index_ = std::make_unique<InMemoryLabelIndex>(this, config);
label_property_index_ = std::make_unique<InMemoryLabelPropertyIndex>(this, config);
} else {
label_index_ = std::make_unique<DiskLabelIndex>(this, constraints, config);
label_property_index_ = std::make_unique<DiskLabelPropertyIndex>(this, constraints, config);
label_index_ = std::make_unique<DiskLabelIndex>(this, config);
label_property_index_ = std::make_unique<DiskLabelPropertyIndex>(this, config);
}
});
}

View File

@ -19,7 +19,7 @@
namespace memgraph::storage {
struct Indices {
Indices(Constraints *constraints, const Config &config, StorageMode storage_mode);
Indices(const Config &config, StorageMode storage_mode);
Indices(const Indices &) = delete;
Indices(Indices &&) = delete;

View File

@ -19,8 +19,7 @@ namespace memgraph::storage {
class LabelIndex {
public:
LabelIndex(Indices *indices, Constraints *constraints, const Config &config)
: indices_(indices), constraints_(constraints), config_(config) {}
LabelIndex(Indices *indices, const Config &config) : indices_(indices), config_(config) {}
LabelIndex(const LabelIndex &) = delete;
LabelIndex(LabelIndex &&) = delete;
@ -44,7 +43,6 @@ class LabelIndex {
protected:
/// TODO: andi maybe no need for have those in abstract class if disk storage isn't using it
Indices *indices_;
Constraints *constraints_;
Config config_;
};

View File

@ -19,8 +19,7 @@ namespace memgraph::storage {
class LabelPropertyIndex {
public:
LabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config)
: indices_(indices), constraints_(constraints), config_(config) {}
LabelPropertyIndex(Indices *indices, const Config &config) : indices_(indices), config_(config) {}
LabelPropertyIndex(const LabelPropertyIndex &) = delete;
LabelPropertyIndex(LabelPropertyIndex &&) = delete;
@ -52,7 +51,6 @@ class LabelPropertyIndex {
protected:
Indices *indices_;
Constraints *constraints_;
Config config_;
};

View File

@ -10,12 +10,12 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/indices_utils.hpp"
namespace memgraph::storage {
InMemoryLabelIndex::InMemoryLabelIndex(Indices *indices, Constraints *constraints, Config config)
: LabelIndex(indices, constraints, config) {}
InMemoryLabelIndex::InMemoryLabelIndex(Indices *indices, Config config) : LabelIndex(indices, config) {}
void InMemoryLabelIndex::UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) {
auto it = index_.find(added_label);
@ -151,10 +151,11 @@ void InMemoryLabelIndex::RunGC() {
}
}
InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Transaction *transaction) {
InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Transaction *transaction,
Constraints *constraints) {
const auto it = index_.find(label);
MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint());
return {it->second.access(), label, view, transaction, indices_, constraints_, config_};
return {it->second.access(), label, view, transaction, indices_, constraints, config_};
}
void InMemoryLabelIndex::SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats) {

View File

@ -11,6 +11,7 @@
#pragma once
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/label_index.hpp"
#include "storage/v2/vertex.hpp"
@ -37,7 +38,7 @@ class InMemoryLabelIndex : public storage::LabelIndex {
};
public:
InMemoryLabelIndex(Indices *indices, Constraints *constraints, Config config);
InMemoryLabelIndex(Indices *indices, Config config);
/// @throw std::bad_alloc
void UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) override;
@ -99,7 +100,7 @@ class InMemoryLabelIndex : public storage::LabelIndex {
void RunGC();
Iterable Vertices(LabelId label, View view, Transaction *transaction);
Iterable Vertices(LabelId label, View view, Transaction *transaction, Constraints *constraints);
void SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats);

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/indices_utils.hpp"
namespace memgraph::storage {
@ -32,12 +33,13 @@ bool InMemoryLabelPropertyIndex::Entry::operator<(const PropertyValue &rhs) cons
bool InMemoryLabelPropertyIndex::Entry::operator==(const PropertyValue &rhs) const { return value == rhs; }
InMemoryLabelPropertyIndex::InMemoryLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config)
: LabelPropertyIndex(indices, constraints, config) {}
InMemoryLabelPropertyIndex::InMemoryLabelPropertyIndex(Indices *indices, const Config &config)
: LabelPropertyIndex(indices, config) {}
bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property,
utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
spdlog::trace("Vertices size when creating index: {}", vertices.size());
auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator it) {
using IndexAccessor = decltype(it->second.access());
@ -426,11 +428,12 @@ void InMemoryLabelPropertyIndex::RunGC() {
InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices(
LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction) {
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction,
Constraints *constraints) {
auto it = index_.find({label, property});
MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint());
return {it->second.access(), label, property, lower_bound, upper_bound, view,
transaction, indices_, constraints_, config_};
transaction, indices_, constraints, config_};
}
} // namespace memgraph::storage

View File

@ -11,6 +11,7 @@
#pragma once
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/label_property_index.hpp"
namespace memgraph::storage {
@ -39,7 +40,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
};
public:
InMemoryLabelPropertyIndex(Indices *indices, Constraints *constraints, const Config &config);
InMemoryLabelPropertyIndex(Indices *indices, const Config &config);
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices,
@ -131,7 +132,8 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
void RunGC();
Iterable Vertices(LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction);
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction,
Constraints *constraints);
private:
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>> index_;

View File

@ -139,10 +139,9 @@ void InMemoryReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Bu
storage_->constraints_.existence_constraints_ = std::make_unique<ExistenceConstraints>();
storage_->constraints_.unique_constraints_ = std::make_unique<InMemoryUniqueConstraints>();
storage_->indices_.label_index_ =
std::make_unique<InMemoryLabelIndex>(&storage_->indices_, &storage_->constraints_, storage_->config_);
storage_->indices_.label_index_ = std::make_unique<InMemoryLabelIndex>(&storage_->indices_, storage_->config_);
storage_->indices_.label_property_index_ =
std::make_unique<InMemoryLabelPropertyIndex>(&storage_->indices_, &storage_->constraints_, storage_->config_);
std::make_unique<InMemoryLabelPropertyIndex>(&storage_->indices_, storage_->config_);
try {
spdlog::debug("Loading snapshot");
auto &epoch =

View File

@ -1083,14 +1083,14 @@ InMemoryStorage::DropUniqueConstraint(LabelId label, const std::set<PropertyId>
VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, View view) {
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(storage_->indices_.label_index_.get());
return VerticesIterable(mem_label_index->Vertices(label, view, &transaction_));
return VerticesIterable(mem_label_index->Vertices(label, view, &transaction_, &storage_->constraints_));
}
VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, PropertyId property, View view) {
auto *mem_label_property_index =
static_cast<InMemoryLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
return VerticesIterable(
mem_label_property_index->Vertices(label, property, std::nullopt, std::nullopt, view, &transaction_));
return VerticesIterable(mem_label_property_index->Vertices(label, property, std::nullopt, std::nullopt, view,
&transaction_, &storage_->constraints_));
}
VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, PropertyId property,
@ -1098,7 +1098,8 @@ VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, Prop
auto *mem_label_property_index =
static_cast<InMemoryLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
return VerticesIterable(mem_label_property_index->Vertices(label, property, utils::MakeBoundInclusive(value),
utils::MakeBoundInclusive(value), view, &transaction_));
utils::MakeBoundInclusive(value), view, &transaction_,
&storage_->constraints_));
}
VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(
@ -1106,8 +1107,8 @@ VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) {
auto *mem_label_property_index =
static_cast<InMemoryLabelPropertyIndex *>(storage_->indices_.label_property_index_.get());
return VerticesIterable(
mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, &transaction_));
return VerticesIterable(mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view,
&transaction_, &storage_->constraints_));
}
Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) {
@ -1131,7 +1132,7 @@ Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, S
start_timestamp = timestamp_++;
}
}
return {transaction_id, start_timestamp, isolation_level, storage_mode};
return {transaction_id, start_timestamp, isolation_level, storage_mode, false};
}
template <bool force>

View File

@ -0,0 +1,39 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <unordered_map>
#include "storage/v2/delta.hpp"
#include "storage/v2/edge_ref.hpp"
#include "storage/v2/id_types.hpp"
namespace memgraph::storage {
struct ModifiedEdgeInfo {
ModifiedEdgeInfo(Delta::Action delta, Gid from_vertex, Gid to_vertex, EdgeTypeId edge_type, const EdgeRef &edge)
: delta_action(delta),
src_vertex_gid(from_vertex),
dest_vertex_gid(to_vertex),
edge_type_id(edge_type),
edge_ref(edge) {}
Delta::Action delta_action;
Gid src_vertex_gid;
Gid dest_vertex_gid;
EdgeTypeId edge_type_id;
EdgeRef edge_ref;
};
using ModifiedEdgesMap = std::unordered_map<Gid, ModifiedEdgeInfo>;
} // namespace memgraph::storage

View File

@ -12,11 +12,13 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <optional>
#include "storage/v2/property_value.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp"
#include "utils/rocksdb_serialization.hpp"
namespace memgraph::storage {
@ -68,6 +70,7 @@ inline std::size_t ApplyDeltasForRead(Transaction const *transaction, const Delt
// of the database.
if (view == View::OLD && ts == commit_timestamp &&
(cid < transaction->command_id ||
// This check is used for on-disk storage. The vertex is valid only if it was deserialized in this transaction.
(cid == transaction->command_id && delta->action == Delta::Action::DELETE_DESERIALIZED_OBJECT))) {
break;
}
@ -113,23 +116,38 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
transaction->command_id);
}
inline Delta *CreateDeleteObjectDelta(Transaction *transaction, std::list<Delta> *deltas) {
if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
return nullptr;
}
transaction->EnsureCommitTimestampExists();
return &deltas->emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(), transaction->command_id);
}
/// TODO: what if in-memory analytical
inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std::optional<std::string> old_disk_key,
const std::string &ts) {
std::string &&ts) {
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
transaction->EnsureCommitTimestampExists();
return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key);
}
inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list<Delta> &deltas,
inline Delta *CreateDeleteDeserializedObjectDelta(std::list<Delta> *deltas, std::optional<std::string> old_disk_key,
std::string &&ts) {
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
return &deltas->emplace_back(Delta::DeleteDeserializedObjectTag(), std::stoull(ts), old_disk_key);
}
inline Delta *CreateDeleteDeserializedIndexObjectDelta(std::list<Delta> &deltas,
std::optional<std::string> old_disk_key, const uint64_t ts) {
return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, old_disk_key);
}
/// TODO: what if in-memory analytical
inline Delta *CreateDeleteDeserializedIndexObjectDelta(Transaction *transaction, std::list<Delta> &deltas,
inline Delta *CreateDeleteDeserializedIndexObjectDelta(std::list<Delta> &deltas,
std::optional<std::string> old_disk_key, const std::string &ts) {
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
return CreateDeleteDeserializedIndexObjectDelta(transaction, deltas, old_disk_key, std::stoull(ts));
return CreateDeleteDeserializedIndexObjectDelta(deltas, old_disk_key, std::stoull(ts));
}
/// This function creates a delta in the transaction for the object and links

View File

@ -49,7 +49,7 @@ Storage::Storage(Config config, StorageMode storage_mode)
config_(config),
isolation_level_(config.transaction.isolation_level),
storage_mode_(storage_mode),
indices_(&constraints_, config, storage_mode),
indices_(config, storage_mode),
constraints_(config, storage_mode),
id_(config.name),
replication_state_(config_.durability.restore_replication_state_on_startup,
@ -99,10 +99,10 @@ void Storage::SetStorageMode(StorageMode storage_mode) {
}
}
IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; }
StorageMode Storage::GetStorageMode() const { return storage_mode_; }
IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; }
utils::BasicResult<Storage::SetIsolationLevelError> Storage::SetIsolationLevel(IsolationLevel isolation_level) {
std::unique_lock main_guard{main_lock_};
if (storage_mode_ == storage::StorageMode::IN_MEMORY_ANALYTICAL) {

View File

@ -21,6 +21,7 @@
#include "storage/v2/delta.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/modified_edge.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex.hpp"
@ -34,13 +35,14 @@ const uint64_t kTransactionInitialId = 1ULL << 63U;
struct Transaction {
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level,
StorageMode storage_mode)
StorageMode storage_mode, bool edge_import_mode_active)
: transaction_id(transaction_id),
start_timestamp(start_timestamp),
command_id(0),
must_abort(false),
isolation_level(isolation_level),
storage_mode(storage_mode) {}
storage_mode(storage_mode),
edge_import_mode_active(edge_import_mode_active) {}
Transaction(Transaction &&other) noexcept
: transaction_id(other.transaction_id.load(std::memory_order_acquire)),
@ -51,6 +53,7 @@ struct Transaction {
must_abort(other.must_abort),
isolation_level(other.isolation_level),
storage_mode(other.storage_mode),
edge_import_mode_active(other.edge_import_mode_active),
manyDeltasCache{std::move(other.manyDeltasCache)} {}
Transaction(const Transaction &) = delete;
@ -59,12 +62,22 @@ struct Transaction {
~Transaction() {}
bool IsDiskStorage() const { return storage_mode == StorageMode::ON_DISK_TRANSACTIONAL; }
/// @throw std::bad_alloc if failed to create the `commit_timestamp`
void EnsureCommitTimestampExists() {
if (commit_timestamp != nullptr) return;
commit_timestamp = std::make_unique<std::atomic<uint64_t>>(transaction_id.load(std::memory_order_relaxed));
}
void AddModifiedEdge(Gid gid, ModifiedEdgeInfo modified_edge) {
if (IsDiskStorage()) {
modified_edges_.emplace(gid, modified_edge);
}
}
void RemoveModifiedEdge(const Gid &gid) { modified_edges_.erase(gid); }
std::atomic<uint64_t> transaction_id;
uint64_t start_timestamp;
// The `Transaction` object is stack allocated, but the `commit_timestamp`
@ -77,11 +90,15 @@ struct Transaction {
bool must_abort;
IsolationLevel isolation_level;
StorageMode storage_mode;
bool edge_import_mode_active{false};
// A cache which is consistent to the current transaction_id + command_id.
// Used to speedup getting info about a vertex when there is a long delta
// chain involved in rebuilding that info.
mutable VertexInfoCache manyDeltasCache;
// Store modified edges GID mapped to changed Delta and serialized edge key
ModifiedEdgesMap modified_edges_;
};
inline bool operator==(const Transaction &first, const Transaction &second) {

View File

@ -15,6 +15,7 @@
#include <tuple>
#include <utility>
#include "query/exceptions.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices/indices.hpp"
@ -89,6 +90,9 @@ bool VertexAccessor::IsVisible(View view) const {
}
Result<bool> VertexAccessor::AddLabel(LabelId label) {
if (transaction_->edge_import_mode_active) {
throw query::WriteVertexOperationInEdgeImportModeException();
}
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
auto guard = std::unique_lock{vertex_->lock};
@ -109,6 +113,9 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
/// TODO: move to after update and change naming to vertex after update
Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
if (transaction_->edge_import_mode_active) {
throw query::WriteVertexOperationInEdgeImportModeException();
}
auto guard = std::unique_lock{vertex_->lock};
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
@ -224,6 +231,10 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
}
Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) {
if (transaction_->edge_import_mode_active) {
throw query::WriteVertexOperationInEdgeImportModeException();
}
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
auto guard = std::unique_lock{vertex_->lock};
@ -249,6 +260,10 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
}
Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties) {
if (transaction_->edge_import_mode_active) {
throw query::WriteVertexOperationInEdgeImportModeException();
}
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
auto guard = std::unique_lock{vertex_->lock};
@ -268,6 +283,10 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties(
std::map<storage::PropertyId, storage::PropertyValue> &properties) const {
if (transaction_->edge_import_mode_active) {
throw query::WriteVertexOperationInEdgeImportModeException();
}
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
auto guard = std::unique_lock{vertex_->lock};
@ -287,6 +306,9 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
}
Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
if (transaction_->edge_import_mode_active) {
throw query::WriteVertexOperationInEdgeImportModeException();
}
auto guard = std::unique_lock{vertex_->lock};
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;

View File

@ -12,6 +12,7 @@
#pragma once
#include "storage/v2/delta.hpp"
#include "utils/skip_list.hpp"
namespace memgraph::utils {
@ -25,6 +26,11 @@ inline std::optional<std::string> GetOldDiskKeyOrNull(storage::Delta *head) {
return std::nullopt;
}
template <typename TSkipListAccessor>
inline bool ObjectExistsInCache(TSkipListAccessor &accessor, storage::Gid gid) {
return accessor.find(gid) != accessor.end();
}
inline uint64_t GetEarliestTimestamp(storage::Delta *head) {
if (head == nullptr) return 0;
while (head->next != nullptr) {

View File

@ -237,13 +237,18 @@ inline std::string SerializeVertexAsValueForLabelIndex(storage::LabelId indexing
return SerializeVertexAsValueForAuxiliaryStorages(indexing_label, vertex_labels, property_store);
}
inline std::vector<storage::LabelId> DeserializeLabelsFromIndexStorage(const std::string &value) {
std::string labels = value.substr(0, value.find('|'));
return TransformFromStringLabels(utils::Split(labels, ","));
inline std::vector<storage::LabelId> DeserializeLabelsFromIndexStorage(const std::string &key,
const std::string &value) {
std::string labels_str{GetViewOfFirstPartOfSplit(value, '|')};
std::vector<storage::LabelId> labels{TransformFromStringLabels(utils::Split(labels_str, ","))};
std::string indexing_label = key.substr(0, key.find('|'));
labels.emplace_back(storage::LabelId::FromUint(std::stoull(indexing_label)));
return labels;
}
inline std::vector<storage::LabelId> DeserializeLabelsFromLabelIndexStorage(const std::string &value) {
return DeserializeLabelsFromIndexStorage(value);
inline std::vector<storage::LabelId> DeserializeLabelsFromLabelIndexStorage(const std::string &key,
const std::string &value) {
return DeserializeLabelsFromIndexStorage(key, value);
}
inline storage::PropertyStore DeserializePropertiesFromLabelIndexStorage(const std::string &value) {
@ -272,8 +277,9 @@ inline std::string ExtractGidFromLabelPropertyIndexStorage(const std::string &ke
return std::string(GetViewOfThirdPartOfSplit(key, '|'));
}
inline std::vector<storage::LabelId> DeserializeLabelsFromLabelPropertyIndexStorage(const std::string &value) {
return DeserializeLabelsFromIndexStorage(value);
inline std::vector<storage::LabelId> DeserializeLabelsFromLabelPropertyIndexStorage(const std::string &key,
const std::string &value) {
return DeserializeLabelsFromIndexStorage(key, value);
}
inline storage::PropertyStore DeserializePropertiesFromLabelPropertyIndexStorage(const std::string &value) {

View File

@ -186,6 +186,7 @@ enum class TypeId : uint64_t {
AST_CALL_SUBQUERY,
AST_MULTI_DATABASE_QUERY,
AST_SHOW_DATABASES,
AST_EDGE_IMPORT_MODE_QUERY,
// Symbol
SYMBOL,
};

View File

@ -61,6 +61,7 @@ add_subdirectory(load_csv)
add_subdirectory(init_file_flags)
add_subdirectory(analytical_mode)
add_subdirectory(batched_procedures)
add_subdirectory(import_mode)
add_subdirectory(concurrent_query_modules)
add_subdirectory(set_properties)

View File

@ -0,0 +1,6 @@
function(copy_import_mode_e2e_python_files FILE_NAME)
copy_e2e_python_files(import_mode ${FILE_NAME})
endfunction()
copy_import_mode_e2e_python_files(common.py)
copy_import_mode_e2e_python_files(test_command.py)

View File

@ -0,0 +1,25 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
return connection

View File

@ -0,0 +1,208 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import connect, execute_and_fetch_all
def test_import_mode_disabled_for_in_memory_storages():
cursor = connect().cursor()
with pytest.raises(Exception):
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
def test_import_mode_on_off():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
def test_creating_vertices():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n) RETURN n"))) == 2
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
def test_creating_edges():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
def test_label_index_vertices_loading():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "CREATE INDEX ON :User")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_label_index_edges_creation():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE INDEX ON :User")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_label_property_index_vertices_loading():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) WHERE n.id IS NOT NULL RETURN n"))) == 2
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_label_property_index_edges_creation():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n:User) RETURN n"))) == 2
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_edge_deletion_in_edge_import_mode():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3}]->(m)")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 1
execute_and_fetch_all(cursor, "MATCH (n)-[r:FRIENDS {id: 3}]->(m) DELETE r")
assert len(list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n, r, m"))) == 0
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_modification_of_edge_properties_in_edge_import_mode():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE INDEX ON :User(id)")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 2})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
execute_and_fetch_all(
cursor, "MATCH (n:User {id: 1}), (m:User {id: 2}) CREATE (n)-[r:FRIENDS {id: 3, balance: 1000}]->(m)"
)
assert list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN r.balance"))[0][0] == 1000
execute_and_fetch_all(cursor, "MATCH (n)-[r:FRIENDS {id: 3}]->(m) SET r.balance = 2000")
assert list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN r.balance"))[0][0] == 2000
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
assert list(execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN r.balance"))[0][0] == 2000
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_throw_on_vertex_add_label_during_edge_import_mode():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
with pytest.raises(Exception):
execute_and_fetch_all(cursor, "MATCH (u:User {id: 1}) SET u:User:Person RETURN u")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_throw_on_vertex_remove_label_during_edge_import_mode():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User:Person {id: 1})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
with pytest.raises(Exception):
execute_and_fetch_all(cursor, "MATCH (u:User {id: 1}) REMOVE u:Person RETURN u")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_throw_on_vertex_set_property_during_edge_import_mode():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1, balance: 1000})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
with pytest.raises(Exception):
execute_and_fetch_all(cursor, "MATCH (u:User {id: 1}) SET u.balance = 2000")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_throw_on_vertex_create_vertex_during_edge_import_mode():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1, balance: 1000})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
with pytest.raises(Exception):
execute_and_fetch_all(cursor, "CREATE (m:Mother {id: 10})")
execute_and_fetch_all(cursor, "EDGE IMPORT MODE INACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
def test_throw_changing_import_mode_while_in_explicit_tx():
cursor = connect().cursor()
execute_and_fetch_all(cursor, "STORAGE MODE ON_DISK_TRANSACTIONAL")
execute_and_fetch_all(cursor, "CREATE (u:User {id: 1, balance: 1000})")
execute_and_fetch_all(cursor, "BEGIN")
with pytest.raises(Exception):
execute_and_fetch_all(cursor, "EDGE IMPORT MODE ACTIVE")
execute_and_fetch_all(cursor, "MATCH (n:User) DETACH DELETE n")
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,13 @@
import_mode_cluster: &import_mode_cluster
cluster:
main:
args: ["--bolt-port", "7687", "--log-level=TRACE", "--also-log-to-stderr"]
log_file: "transaction_queue.log"
setup_queries: []
validation_queries: []
workloads:
- name: "Import mode"
binary: "tests/e2e/pytest_runner.sh"
args: ["import_mode/test_command.py"]
<<: *import_mode_cluster

View File

@ -60,7 +60,7 @@ class DeltaGenerator final {
explicit Transaction(DeltaGenerator *gen)
: gen_(gen),
transaction_(gen->transaction_id_++, gen->timestamp_++, memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION,
gen->storage_mode_) {}
gen->storage_mode_, false) {}
public:
memgraph::storage::Vertex *CreateVertex() {