diff --git a/src/auth/models.cpp b/src/auth/models.cpp index 24f8ab2d7..3bd35833e 100644 --- a/src/auth/models.cpp +++ b/src/auth/models.cpp @@ -45,7 +45,9 @@ const std::vector kPermissionsAll = {Permission::MATCH, Permis Permission::FREE_MEMORY, Permission::TRIGGER, Permission::CONFIG, Permission::STREAM, Permission::MODULE_READ, Permission::MODULE_WRITE, - Permission::WEBSOCKET, Permission::TRANSACTION_MANAGEMENT}; + Permission::WEBSOCKET, Permission::TRANSACTION_MANAGEMENT, + Permission::STORAGE_MODE}; + } // namespace std::string PermissionToString(Permission permission) { @@ -94,6 +96,8 @@ std::string PermissionToString(Permission permission) { return "WEBSOCKET"; case Permission::TRANSACTION_MANAGEMENT: return "TRANSACTION_MANAGEMENT"; + case Permission::STORAGE_MODE: + return "STORAGE_MODE"; } } diff --git a/src/auth/models.hpp b/src/auth/models.hpp index 85a1ae31d..4501c18c4 100644 --- a/src/auth/models.hpp +++ b/src/auth/models.hpp @@ -40,7 +40,8 @@ enum class Permission : uint64_t { MODULE_READ = 1U << 18U, MODULE_WRITE = 1U << 19U, WEBSOCKET = 1U << 20U, - TRANSACTION_MANAGEMENT = 1U << 21U + TRANSACTION_MANAGEMENT = 1U << 21U, + STORAGE_MODE = 1U << 22U }; // clang-format on diff --git a/src/glue/auth.cpp b/src/glue/auth.cpp index 0811ff2e1..0ac58e844 100644 --- a/src/glue/auth.cpp +++ b/src/glue/auth.cpp @@ -58,6 +58,8 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) { return auth::Permission::MODULE_WRITE; case query::AuthQuery::Privilege::WEBSOCKET: return auth::Permission::WEBSOCKET; + case query::AuthQuery::Privilege::STORAGE_MODE: + return auth::Permission::STORAGE_MODE; case query::AuthQuery::Privilege::TRANSACTION_MANAGEMENT: return auth::Permission::TRANSACTION_MANAGEMENT; } diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 659f8e9e5..d82e3a062 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -211,6 +211,21 @@ class IsolationLevelModificationInMulticommandTxException : public QueryExceptio : QueryException("Isolation level cannot be modified in multicommand transactions.") {} }; +class IsolationLevelModificationInAnalyticsException : public QueryException { + public: + IsolationLevelModificationInAnalyticsException() + : QueryException( + "Isolation level cannot be modified when storage mode is set to IN_MEMORY_ANALYTICAL." + "IN_MEMORY_ANALYTICAL mode doesn't provide any isolation guarantees, " + "you can think about it as an equivalent to READ_UNCOMMITED.") {} +}; + +class StorageModeModificationInMulticommandTxException : public QueryException { + public: + StorageModeModificationInMulticommandTxException() + : QueryException("Storage mode cannot be modified in multicommand transactions.") {} +}; + class CreateSnapshotInMulticommandTxException final : public QueryException { public: CreateSnapshotInMulticommandTxException() diff --git a/src/query/frontend/ast/ast.cpp b/src/query/frontend/ast/ast.cpp index 089a28576..631bba687 100644 --- a/src/query/frontend/ast/ast.cpp +++ b/src/query/frontend/ast/ast.cpp @@ -243,6 +243,9 @@ constexpr utils::TypeInfo query::TriggerQuery::kType{utils::TypeId::AST_TRIGGER_ constexpr utils::TypeInfo query::IsolationLevelQuery::kType{utils::TypeId::AST_ISOLATION_LEVEL_QUERY, "IsolationLevelQuery", &query::Query::kType}; +constexpr utils::TypeInfo query::StorageModeQuery::kType{utils::TypeId::AST_STORAGE_MODE_QUERY, "StorageModeQuery", + &query::Query::kType}; + constexpr utils::TypeInfo query::CreateSnapshotQuery::kType{utils::TypeId::AST_CREATE_SNAPSHOT_QUERY, "CreateSnapshotQuery", &query::Query::kType}; diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index 7d7e27278..a1ecfafff 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -2714,6 +2714,7 @@ class AuthQuery : public memgraph::query::Query { MODULE_READ, MODULE_WRITE, WEBSOCKET, + STORAGE_MODE, TRANSACTION_MANAGEMENT }; @@ -2777,7 +2778,8 @@ const std::vector kPrivilegesAll = { AuthQuery::Privilege::FREE_MEMORY, AuthQuery::Privilege::TRIGGER, AuthQuery::Privilege::CONFIG, AuthQuery::Privilege::STREAM, AuthQuery::Privilege::MODULE_READ, AuthQuery::Privilege::MODULE_WRITE, - AuthQuery::Privilege::WEBSOCKET, AuthQuery::Privilege::TRANSACTION_MANAGEMENT}; + AuthQuery::Privilege::WEBSOCKET, AuthQuery::Privilege::TRANSACTION_MANAGEMENT, + AuthQuery::Privilege::STORAGE_MODE}; class InfoQuery : public memgraph::query::Query { public: @@ -3046,6 +3048,29 @@ class IsolationLevelQuery : public memgraph::query::Query { friend class AstStorage; }; +class StorageModeQuery : public memgraph::query::Query { + public: + static const utils::TypeInfo kType; + const utils::TypeInfo &GetTypeInfo() const override { return kType; } + + enum class StorageMode { IN_MEMORY_TRANSACTIONAL, IN_MEMORY_ANALYTICAL }; + + StorageModeQuery() = default; + + DEFVISITABLE(QueryVisitor); + + memgraph::query::StorageModeQuery::StorageMode storage_mode_; + + StorageModeQuery *Clone(AstStorage *storage) const override { + StorageModeQuery *object = storage->Create(); + object->storage_mode_ = storage_mode_; + return object; + } + + private: + friend class AstStorage; +}; + class CreateSnapshotQuery : public memgraph::query::Query { public: static const utils::TypeInfo kType; diff --git a/src/query/frontend/ast/ast_visitor.hpp b/src/query/frontend/ast/ast_visitor.hpp index 39119f6ff..fe805f4b6 100644 --- a/src/query/frontend/ast/ast_visitor.hpp +++ b/src/query/frontend/ast/ast_visitor.hpp @@ -89,6 +89,7 @@ class LoadCsv; class FreeMemoryQuery; class TriggerQuery; class IsolationLevelQuery; +class StorageModeQuery; class CreateSnapshotQuery; class StreamQuery; class SettingQuery; @@ -134,6 +135,6 @@ class QueryVisitor : public utils::Visitor {}; + ShowConfigQuery, TransactionQueueQuery, StorageModeQuery, AnalyzeGraphQuery> {}; } // namespace memgraph::query diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 0d417e35d..a2aee8656 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -487,6 +487,20 @@ antlrcpp::Any CypherMainVisitor::visitIsolationLevelQuery(MemgraphCypher::Isolat return isolation_level_query; } +antlrcpp::Any CypherMainVisitor::visitStorageModeQuery(MemgraphCypher::StorageModeQueryContext *ctx) { + auto *storage_mode_query = storage_->Create(); + + storage_mode_query->storage_mode_ = std::invoke([mode = ctx->storageMode()]() { + if (mode->IN_MEMORY_ANALYTICAL()) { + return StorageModeQuery::StorageMode::IN_MEMORY_ANALYTICAL; + } + return StorageModeQuery::StorageMode::IN_MEMORY_TRANSACTIONAL; + }); + + query_ = storage_mode_query; + return storage_mode_query; +} + antlrcpp::Any CypherMainVisitor::visitCreateSnapshotQuery(MemgraphCypher::CreateSnapshotQueryContext *ctx) { query_ = storage_->Create(); return query_; @@ -1511,6 +1525,7 @@ antlrcpp::Any CypherMainVisitor::visitPrivilege(MemgraphCypher::PrivilegeContext if (ctx->MODULE_WRITE()) return AuthQuery::Privilege::MODULE_WRITE; if (ctx->WEBSOCKET()) return AuthQuery::Privilege::WEBSOCKET; if (ctx->TRANSACTION_MANAGEMENT()) return AuthQuery::Privilege::TRANSACTION_MANAGEMENT; + if (ctx->STORAGE_MODE()) return AuthQuery::Privilege::STORAGE_MODE; LOG_FATAL("Should not get here - unknown privilege!"); } diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index 7e1c17a1a..7dcc7e2fd 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -263,6 +263,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitIsolationLevelQuery(MemgraphCypher::IsolationLevelQueryContext *ctx) override; + /** + * @return StorageModeQuery* + */ + antlrcpp::Any visitStorageModeQuery(MemgraphCypher::StorageModeQueryContext *ctx) override; + /** * @return CreateSnapshotQuery* */ diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 04e6a7aba..07ddd8c52 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -59,6 +59,8 @@ memgraphCypherKeyword : cypherKeyword | HEADER | IDENTIFIED | ISOLATION + | IN_MEMORY_ANALYTICAL + | IN_MEMORY_TRANSACTIONAL | KAFKA | LABELS | LEVEL @@ -88,6 +90,7 @@ memgraphCypherKeyword : cypherKeyword | SNAPSHOT | START | STATS + | STORAGE | STREAM | STREAMS | SYNC @@ -127,6 +130,7 @@ query : cypherQuery | freeMemoryQuery | triggerQuery | isolationLevelQuery + | storageModeQuery | createSnapshotQuery | streamQuery | settingQuery @@ -277,6 +281,7 @@ privilege : CREATE | MODULE_WRITE | WEBSOCKET | TRANSACTION_MANAGEMENT + | STORAGE_MODE ; granularPrivilege : NOTHING | READ | UPDATE | CREATE_DELETE ; @@ -354,6 +359,10 @@ isolationLevelScope : GLOBAL | SESSION | NEXT ; isolationLevelQuery : SET isolationLevelScope TRANSACTION ISOLATION LEVEL isolationLevel ; +storageMode : IN_MEMORY_ANALYTICAL | IN_MEMORY_TRANSACTIONAL ; + +storageModeQuery : STORAGE MODE storageMode ; + createSnapshotQuery : CREATE SNAPSHOT ; streamName : symbolicName ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index d081419a7..62a2d916d 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -25,103 +25,107 @@ import CypherLexer ; UNDERSCORE : '_' ; -AFTER : A F T E R ; -ALTER : A L T E R ; -ANALYZE : A N A L Y Z E ; -ASYNC : A S Y N C ; -AUTH : A U T H ; -BAD : B A D ; -BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ; -BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ; -BATCH_SIZE : B A T C H UNDERSCORE S I Z E ; -BEFORE : B E F O R E ; -BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ; -CALL : C A L L ; -CHECK : C H E C K ; -CLEAR : C L E A R ; -COMMIT : C O M M I T ; -COMMITTED : C O M M I T T E D ; -CONFIG : C O N F I G ; -CONFIGS : C O N F I G S; -CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ; -CREATE_DELETE : C R E A T E UNDERSCORE D E L E T E ; -CREDENTIALS : C R E D E N T I A L S ; -CSV : C S V ; -DATA : D A T A ; -DELIMITER : D E L I M I T E R ; -DATABASE : D A T A B A S E ; -DENY : D E N Y ; -DIRECTORY : D I R E C T O R Y ; -DROP : D R O P ; -DUMP : D U M P ; -DURABILITY : D U R A B I L I T Y ; -EDGE_TYPES : E D G E UNDERSCORE T Y P E S ; -EXECUTE : E X E C U T E ; -FOR : F O R ; -FOREACH : F O R E A C H; -FREE : F R E E ; -FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ; -FROM : F R O M ; -GLOBAL : G L O B A L ; -GRANT : G R A N T ; -GRAPH : G R A P H ; -GRANTS : G R A N T S ; -HEADER : H E A D E R ; -IDENTIFIED : I D E N T I F I E D ; -IGNORE : I G N O R E ; -ISOLATION : I S O L A T I O N ; -KAFKA : K A F K A ; -LABELS : L A B E L S ; -LEVEL : L E V E L ; -LOAD : L O A D ; -LOCK : L O C K ; -MAIN : M A I N ; -MODE : M O D E ; -MODULE_READ : M O D U L E UNDERSCORE R E A D ; -MODULE_WRITE : M O D U L E UNDERSCORE W R I T E ; -NEXT : N E X T ; -NO : N O ; -NOTHING : N O T H I N G ; -PASSWORD : P A S S W O R D ; -PORT : P O R T ; -PRIVILEGES : P R I V I L E G E S ; -PULSAR : P U L S A R ; -READ : R E A D ; -READ_FILE : R E A D UNDERSCORE F I L E ; -REGISTER : R E G I S T E R ; -REPLICA : R E P L I C A ; -REPLICAS : R E P L I C A S ; -REPLICATION : R E P L I C A T I O N ; -REVOKE : R E V O K E ; -ROLE : R O L E ; -ROLES : R O L E S ; -QUOTE : Q U O T E ; -SERVICE_URL : S E R V I C E UNDERSCORE U R L ; -SESSION : S E S S I O N ; -SETTING : S E T T I N G ; -SETTINGS : S E T T I N G S ; -SNAPSHOT : S N A P S H O T ; -START : S T A R T ; -STATISTICS : S T A T I S T I C S ; -STATS : S T A T S ; -STOP : S T O P ; -STREAM : S T R E A M ; -STREAMS : S T R E A M S ; -SYNC : S Y N C ; -TERMINATE : T E R M I N A T E ; -TIMEOUT : T I M E O U T ; -TO : T O ; -TOPICS : T O P I C S; -TRANSACTION : T R A N S A C T I O N ; -TRANSACTION_MANAGEMENT : T R A N S A C T I O N UNDERSCORE M A N A G E M E N T ; -TRANSACTIONS : T R A N S A C T I O N S ; -TRANSFORM : T R A N S F O R M ; -TRIGGER : T R I G G E R ; -TRIGGERS : T R I G G E R S ; -UNCOMMITTED : U N C O M M I T T E D ; -UNLOCK : U N L O C K ; -UPDATE : U P D A T E ; -USER : U S E R ; -USERS : U S E R S ; -VERSION : V E R S I O N ; -WEBSOCKET : W E B S O C K E T ; +AFTER : A F T E R ; +ALTER : A L T E R ; +ANALYZE : A N A L Y Z E ; +ASYNC : A S Y N C ; +AUTH : A U T H ; +BAD : B A D ; +BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ; +BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ; +BATCH_SIZE : B A T C H UNDERSCORE S I Z E ; +BEFORE : B E F O R E ; +BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ; +CALL : C A L L ; +CHECK : C H E C K ; +CLEAR : C L E A R ; +COMMIT : C O M M I T ; +COMMITTED : C O M M I T T E D ; +CONFIG : C O N F I G ; +CONFIGS : C O N F I G S; +CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ; +CREATE_DELETE : C R E A T E UNDERSCORE D E L E T E ; +CREDENTIALS : C R E D E N T I A L S ; +CSV : C S V ; +DATA : D A T A ; +DELIMITER : D E L I M I T E R ; +DATABASE : D A T A B A S E ; +DENY : D E N Y ; +DIRECTORY : D I R E C T O R Y ; +DROP : D R O P ; +DUMP : D U M P ; +DURABILITY : D U R A B I L I T Y ; +EDGE_TYPES : E D G E UNDERSCORE T Y P E S ; +EXECUTE : E X E C U T E ; +FOR : F O R ; +FOREACH : F O R E A C H; +FREE : F R E E ; +FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ; +FROM : F R O M ; +GLOBAL : G L O B A L ; +GRANT : G R A N T ; +GRAPH : G R A P H ; +GRANTS : G R A N T S ; +HEADER : H E A D E R ; +IDENTIFIED : I D E N T I F I E D ; +IGNORE : I G N O R E ; +ISOLATION : I S O L A T I O N ; +IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ; +IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ; +KAFKA : K A F K A ; +LABELS : L A B E L S ; +LEVEL : L E V E L ; +LOAD : L O A D ; +LOCK : L O C K ; +MAIN : M A I N ; +MODE : M O D E ; +MODULE_READ : M O D U L E UNDERSCORE R E A D ; +MODULE_WRITE : M O D U L E UNDERSCORE W R I T E ; +NEXT : N E X T ; +NO : N O ; +NOTHING : N O T H I N G ; +PASSWORD : P A S S W O R D ; +PORT : P O R T ; +PRIVILEGES : P R I V I L E G E S ; +PULSAR : P U L S A R ; +READ : R E A D ; +READ_FILE : R E A D UNDERSCORE F I L E ; +REGISTER : R E G I S T E R ; +REPLICA : R E P L I C A ; +REPLICAS : R E P L I C A S ; +REPLICATION : R E P L I C A T I O N ; +REVOKE : R E V O K E ; +ROLE : R O L E ; +ROLES : R O L E S ; +QUOTE : Q U O T E ; +SERVICE_URL : S E R V I C E UNDERSCORE U R L ; +SESSION : S E S S I O N ; +SETTING : S E T T I N G ; +SETTINGS : S E T T I N G S ; +SNAPSHOT : S N A P S H O T ; +START : S T A R T ; +STATISTICS : S T A T I S T I C S ; +STATS : S T A T S ; +STOP : S T O P ; +STORAGE : S T O R A G E; +STORAGE_MODE : S T O R A G E UNDERSCORE MODE; +STREAM : S T R E A M ; +STREAMS : S T R E A M S ; +SYNC : S Y N C ; +TERMINATE : T E R M I N A T E ; +TIMEOUT : T I M E O U T ; +TO : T O ; +TOPICS : T O P I C S; +TRANSACTION : T R A N S A C T I O N ; +TRANSACTION_MANAGEMENT : T R A N S A C T I O N UNDERSCORE M A N A G E M E N T ; +TRANSACTIONS : T R A N S A C T I O N S ; +TRANSFORM : T R A N S F O R M ; +TRIGGER : T R I G G E R ; +TRIGGERS : T R I G G E R S ; +UNCOMMITTED : U N C O M M I T T E D ; +UNLOCK : U N L O C K ; +UPDATE : U P D A T E ; +USER : U S E R ; +USERS : U S E R S ; +VERSION : V E R S I O N ; +WEBSOCKET : W E B S O C K E T ; diff --git a/src/query/frontend/semantic/required_privileges.cpp b/src/query/frontend/semantic/required_privileges.cpp index 6e90bd3a3..abb11bb63 100644 --- a/src/query/frontend/semantic/required_privileges.cpp +++ b/src/query/frontend/semantic/required_privileges.cpp @@ -78,6 +78,8 @@ class PrivilegeExtractor : public QueryVisitor, public HierarchicalTreeVis void Visit(IsolationLevelQuery &isolation_level_query) override { AddPrivilege(AuthQuery::Privilege::CONFIG); } + void Visit(StorageModeQuery & /*storage_mode_query*/) override { AddPrivilege(AuthQuery::Privilege::STORAGE_MODE); } + void Visit(CreateSnapshotQuery &create_snapshot_query) override { AddPrivilege(AuthQuery::Privilege::DURABILITY); } void Visit(SettingQuery & /*setting_query*/) override { AddPrivilege(AuthQuery::Privilege::CONFIG); } diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index b4e064cde..8efe86614 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -208,7 +208,10 @@ const trie::Trie kKeywords = {"union", "websocket", "foreach", "labels", - "edge_types"}; + "edge_types", + "off", + "in_memory_transactional", + "in_memory_analytical"}; // Unicode codepoints that are allowed at the start of the unescaped name. const std::bitset kUnescapedNameAllowedStarts( diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index a94bef1c9..6d47f8633 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -1169,7 +1169,6 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) if (in_explicit_transaction_) { throw ExplicitTransactionUsageException("Nested transactions are not supported."); } - in_explicit_transaction_ = true; expect_rollback_ = false; @@ -2000,7 +1999,16 @@ constexpr auto ToStorageIsolationLevel(const IsolationLevelQuery::IsolationLevel } } -PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_explicit_transaction, +constexpr auto ToStorageMode(const StorageModeQuery::StorageMode storage_mode) noexcept { + switch (storage_mode) { + case StorageModeQuery::StorageMode::IN_MEMORY_TRANSACTIONAL: + return storage::StorageMode::IN_MEMORY_TRANSACTIONAL; + case StorageModeQuery::StorageMode::IN_MEMORY_ANALYTICAL: + return storage::StorageMode::IN_MEMORY_ANALYTICAL; + } +} + +PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, InterpreterContext *interpreter_context, Interpreter *interpreter) { if (in_explicit_transaction) { throw IsolationLevelModificationInMulticommandTxException(); @@ -2015,7 +2023,15 @@ PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_expli interpreter]() -> std::function { switch (isolation_level_query->isolation_level_scope_) { case IsolationLevelQuery::IsolationLevelScope::GLOBAL: - return [interpreter_context, isolation_level] { interpreter_context->db->SetIsolationLevel(isolation_level); }; + return [interpreter_context, isolation_level] { + if (auto maybe_error = interpreter_context->db->SetIsolationLevel(isolation_level); maybe_error.HasError()) { + switch (maybe_error.GetError()) { + case storage::Storage::SetIsolationLevelError::DisabledForAnalyticalMode: + throw IsolationLevelModificationInAnalyticsException(); + break; + } + } + }; case IsolationLevelQuery::IsolationLevelScope::SESSION: return [interpreter, isolation_level] { interpreter->SetSessionIsolationLevel(isolation_level); }; case IsolationLevelQuery::IsolationLevelScope::NEXT: @@ -2033,6 +2049,41 @@ PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_expli RWType::NONE}; } +PreparedQuery PrepareStorageModeQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, + InterpreterContext *interpreter_context) { + if (in_explicit_transaction) { + throw StorageModeModificationInMulticommandTxException(); + } + + auto *storage_mode_query = utils::Downcast(parsed_query.query); + MG_ASSERT(storage_mode_query); + const auto storage_mode = ToStorageMode(storage_mode_query->storage_mode_); + + auto exists_active_transaction = interpreter_context->interpreters.WithLock([](const auto &interpreters_) { + return std::any_of(interpreters_.begin(), interpreters_.end(), [](const auto &interpreter) { + return interpreter->transaction_status_.load() != TransactionStatus::IDLE; + }); + }); + if (exists_active_transaction) { + spdlog::info( + "Storage mode will be modified when there are no other active transactions. Check the status of the " + "transactions using 'SHOW TRANSACTIONS' query and ensure no other transactions are active."); + } + + auto callback = [storage_mode, interpreter_context]() -> std::function { + return [interpreter_context, storage_mode] { interpreter_context->db->SetStorageMode(storage_mode); }; + }(); + + return PreparedQuery{{}, + std::move(parsed_query.required_privileges), + [callback = std::move(callback)](AnyStream * /*stream*/, + std::optional /*n*/) -> std::optional { + callback(); + return QueryHandlerResult::COMMIT; + }, + RWType::NONE}; +} + PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_explicit_transaction, InterpreterContext *interpreter_context) { if (in_explicit_transaction) { @@ -2043,11 +2094,18 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli {}, std::move(parsed_query.required_privileges), [interpreter_context](AnyStream *stream, std::optional n) -> std::optional { - if (auto maybe_error = interpreter_context->db->CreateSnapshot(); maybe_error.HasError()) { + if (auto maybe_error = interpreter_context->db->CreateSnapshot({}); maybe_error.HasError()) { switch (maybe_error.GetError()) { case storage::Storage::CreateSnapshotError::DisabledForReplica: throw utils::BasicException( "Failed to create a snapshot. Replica instances are not allowed to create them."); + case storage::Storage::CreateSnapshotError::DisabledForAnalyticsPeriodicCommit: + spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.", + "https://memgr.ph/replication")); + break; + case storage::Storage::CreateSnapshotError::ReachedMaxNumTries: + spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support"); + break; } } return QueryHandlerResult::COMMIT; @@ -2769,6 +2827,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareStorageModeQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareTransactionQueueQuery(std::move(parsed_query), username_, in_explicit_transaction_, interpreter_context_, &*execution_db_accessor_); diff --git a/src/storage/v2/edge_accessor.cpp b/src/storage/v2/edge_accessor.cpp index abfb67ff7..d14c8d56d 100644 --- a/src/storage/v2/edge_accessor.cpp +++ b/src/storage/v2/edge_accessor.cpp @@ -117,6 +117,7 @@ Result EdgeAccessor::SetProperty(PropertyId property, co // current code always follows the logical pattern of "create a delta" and // "modify in-place". Additionally, the created delta will make other // transactions get a SERIALIZATION_ERROR. + CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value); edge_.ptr->properties.SetProperty(property, value); diff --git a/src/storage/v2/mvcc.hpp b/src/storage/v2/mvcc.hpp index 4c0e55461..47f4aaca5 100644 --- a/src/storage/v2/mvcc.hpp +++ b/src/storage/v2/mvcc.hpp @@ -12,6 +12,7 @@ #pragma once #include + #include "storage/v2/property_value.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/view.hpp" @@ -95,6 +96,9 @@ inline bool PrepareForWrite(Transaction *transaction, TObj *object) { /// a `DELETE_OBJECT` delta). /// @throw std::bad_alloc inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { + if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { + return nullptr; + } transaction->EnsureCommitTimestampExists(); return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(), transaction->command_id); @@ -105,6 +109,9 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { /// @throw std::bad_alloc template inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&...args) { + if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { + return; + } transaction->EnsureCommitTimestampExists(); auto delta = &transaction->deltas.emplace_back(std::forward(args)..., transaction->commit_timestamp.get(), transaction->command_id); diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 5d418b805..4bc101bdb 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -31,6 +31,7 @@ #include "storage/v2/replication/config.hpp" #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/replication_persistence_helper.hpp" +#include "storage/v2/storage_mode.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex_accessor.hpp" #include "utils/file.hpp" @@ -316,6 +317,7 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const { Storage::Storage(Config config) : indices_(&constraints_, config.items), isolation_level_(config.transaction.isolation_level), + storage_mode_(StorageMode::IN_MEMORY_TRANSACTIONAL), config_(config), snapshot_directory_(config_.durability.storage_directory / durability::kSnapshotDirectory), wal_directory_(config_.durability.storage_directory / durability::kWalDirectory), @@ -398,12 +400,19 @@ Storage::Storage(Config config) } if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) { snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] { - if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) { + if (auto maybe_error = this->CreateSnapshot({true}); maybe_error.HasError()) { switch (maybe_error.GetError()) { case CreateSnapshotError::DisabledForReplica: spdlog::warn( utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication")); break; + case CreateSnapshotError::DisabledForAnalyticsPeriodicCommit: + spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.", + "https://memgr.ph/durability")); + break; + case storage::Storage::CreateSnapshotError::ReachedMaxNumTries: + spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support"); + break; } } }); @@ -446,23 +455,30 @@ Storage::~Storage() { snapshot_runner_.Stop(); } if (config_.durability.snapshot_on_exit) { - if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) { + if (auto maybe_error = this->CreateSnapshot({false}); maybe_error.HasError()) { switch (maybe_error.GetError()) { case CreateSnapshotError::DisabledForReplica: spdlog::warn(utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication")); break; + case CreateSnapshotError::DisabledForAnalyticsPeriodicCommit: + spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.", + "https://memgr.ph/replication")); + break; + case storage::Storage::CreateSnapshotError::ReachedMaxNumTries: + spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support"); + break; } } } } -Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level) +Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode) : storage_(storage), // The lock must be acquired before creating the transaction object to // prevent freshly created transactions from dangling in an active state // during exclusive operations. storage_guard_(storage_->main_lock_), - transaction_(storage->CreateTransaction(isolation_level)), + transaction_(storage->CreateTransaction(isolation_level, storage_mode)), is_transaction_active_(true), config_(storage->config_.items) {} @@ -490,11 +506,16 @@ VertexAccessor Storage::Accessor::CreateVertex() { OOMExceptionEnabler oom_exception; auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel); auto acc = storage_->vertices_.access(); - auto delta = CreateDeleteObjectDelta(&transaction_); + + auto *delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta}); MG_ASSERT(inserted, "The vertex must be inserted here!"); MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!"); - delta->prev.Set(&*it); + + if (delta) { + delta->prev.Set(&*it); + } + return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_); } @@ -509,11 +530,14 @@ VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) { storage_->vertex_id_.store(std::max(storage_->vertex_id_.load(std::memory_order_acquire), gid.AsUint() + 1), std::memory_order_release); auto acc = storage_->vertices_.access(); - auto delta = CreateDeleteObjectDelta(&transaction_); + + auto *delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Vertex{gid, delta}); MG_ASSERT(inserted, "The vertex must be inserted here!"); MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!"); - delta->prev.Set(&*it); + if (delta) { + delta->prev.Set(&*it); + } return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_); } @@ -656,12 +680,15 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA EdgeRef edge(gid); if (config_.properties_on_edges) { auto acc = storage_->edges_.access(); - auto delta = CreateDeleteObjectDelta(&transaction_); + + auto *delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Edge(gid, delta)); MG_ASSERT(inserted, "The edge must be inserted here!"); MG_ASSERT(it != acc.end(), "Invalid Edge accessor!"); edge = EdgeRef(&*it); - delta->prev.Set(&*it); + if (delta) { + delta->prev.Set(&*it); + } } CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); @@ -724,12 +751,15 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA EdgeRef edge(gid); if (config_.properties_on_edges) { auto acc = storage_->edges_.access(); - auto delta = CreateDeleteObjectDelta(&transaction_); + + auto *delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Edge(gid, delta)); MG_ASSERT(inserted, "The edge must be inserted here!"); MG_ASSERT(it != acc.end(), "Invalid Edge accessor!"); edge = EdgeRef(&*it); - delta->prev.Set(&*it); + if (delta) { + delta->prev.Set(&*it); + } } CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); @@ -1380,7 +1410,7 @@ VerticesIterable Storage::Accessor::Vertices(LabelId label, PropertyId property, storage_->indices_.label_property_index.Vertices(label, property, lower_bound, upper_bound, view, &transaction_)); } -Transaction Storage::CreateTransaction(IsolationLevel isolation_level) { +Transaction Storage::CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) { // We acquire the transaction engine lock here because we access (and // modify) the transaction engine variables (`transaction_id` and // `timestamp`) below. @@ -1401,7 +1431,7 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level) { start_timestamp = timestamp_++; } } - return {transaction_id, start_timestamp, isolation_level}; + return {transaction_id, start_timestamp, isolation_level, storage_mode}; } template @@ -1907,28 +1937,48 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera return finalized_on_all_replicas; } -utils::BasicResult Storage::CreateSnapshot() { +utils::BasicResult Storage::CreateSnapshot(std::optional is_periodic) { if (replication_role_.load() != ReplicationRole::MAIN) { return CreateSnapshotError::DisabledForReplica; } + auto snapshot_creator = [this]() { + auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_); + // Create snapshot. + durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_, + config_.durability.snapshot_retention_count, &vertices_, &edges_, &name_id_mapper_, + &indices_, &constraints_, config_.items, uuid_, epoch_id_, epoch_history_, + &file_retainer_); + // Finalize snapshot transaction. + commit_log_->MarkFinished(transaction.start_timestamp); + }; + std::lock_guard snapshot_guard(snapshot_lock_); - // Take master RW lock (for reading). - std::shared_lock storage_guard(main_lock_); + auto should_try_shared{true}; + auto max_num_tries{10}; + while (max_num_tries) { + if (should_try_shared) { + std::shared_lock storage_guard(main_lock_); + if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL) { + snapshot_creator(); + return {}; + } + } else { + std::unique_lock main_guard{main_lock_}; + if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) { + if (is_periodic && *is_periodic) { + return CreateSnapshotError::DisabledForAnalyticsPeriodicCommit; + } + snapshot_creator(); + return {}; + } + } + should_try_shared = !should_try_shared; + max_num_tries--; + } - // Create the transaction used to create the snapshot. - auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION); - - // Create snapshot. - durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_, - config_.durability.snapshot_retention_count, &vertices_, &edges_, &name_id_mapper_, - &indices_, &constraints_, config_.items, uuid_, epoch_id_, epoch_history_, - &file_retainer_); - - // Finalize snapshot transaction. - commit_log_->MarkFinished(transaction.start_timestamp); - return {}; + return CreateSnapshotError::ReachedMaxNumTries; } bool Storage::LockPath() { @@ -2114,11 +2164,23 @@ std::vector Storage::ReplicasInfo() { }); } -void Storage::SetIsolationLevel(IsolationLevel isolation_level) { +utils::BasicResult Storage::SetIsolationLevel(IsolationLevel isolation_level) { std::unique_lock main_guard{main_lock_}; + if (storage_mode_ == storage::StorageMode::IN_MEMORY_ANALYTICAL) { + return Storage::SetIsolationLevelError::DisabledForAnalyticalMode; + } + isolation_level_ = isolation_level; + return {}; } +void Storage::SetStorageMode(StorageMode storage_mode) { + std::unique_lock main_guard{main_lock_}; + storage_mode_ = storage_mode; +} + +StorageMode Storage::GetStorageMode() { return storage_mode_; } + void Storage::RestoreReplicas() { MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole()); if (!ShouldStoreAndRestoreReplicas()) { diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 925d81c86..aeea9d6f8 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -33,6 +33,7 @@ #include "storage/v2/mvcc.hpp" #include "storage/v2/name_id_mapper.hpp" #include "storage/v2/result.hpp" +#include "storage/v2/storage_mode.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex.hpp" #include "storage/v2/vertex_accessor.hpp" @@ -200,7 +201,7 @@ class Storage final { private: friend class Storage; - explicit Accessor(Storage *storage, IsolationLevel isolation_level); + explicit Accessor(Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode); public: Accessor(const Accessor &) = delete; @@ -368,7 +369,7 @@ class Storage final { }; Accessor Access(std::optional override_isolation_level = {}) { - return Accessor{this, override_isolation_level.value_or(isolation_level_)}; + return Accessor{this, override_isolation_level.value_or(isolation_level_), storage_mode_}; } const std::string &LabelToName(LabelId label) const; @@ -509,14 +510,24 @@ class Storage final { void FreeMemory(); - void SetIsolationLevel(IsolationLevel isolation_level); + enum class SetIsolationLevelError : uint8_t { DisabledForAnalyticalMode }; - enum class CreateSnapshotError : uint8_t { DisabledForReplica }; + utils::BasicResult SetIsolationLevel(IsolationLevel isolation_level); - utils::BasicResult CreateSnapshot(); + void SetStorageMode(StorageMode storage_mode); + + StorageMode GetStorageMode(); + + enum class CreateSnapshotError : uint8_t { + DisabledForReplica, + DisabledForAnalyticsPeriodicCommit, + ReachedMaxNumTries + }; + + utils::BasicResult CreateSnapshot(std::optional is_periodic); private: - Transaction CreateTransaction(IsolationLevel isolation_level); + Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode); /// The force parameter determines the behaviour of the garbage collector. /// If it's set to true, it will behave as a global operation, i.e. it can't @@ -582,6 +593,7 @@ class Storage final { utils::Synchronized, utils::SpinLock> committed_transactions_; IsolationLevel isolation_level_; + StorageMode storage_mode_; Config config_; utils::Scheduler gc_runner_; diff --git a/src/storage/v2/storage_mode.hpp b/src/storage/v2/storage_mode.hpp new file mode 100644 index 000000000..b1b01684d --- /dev/null +++ b/src/storage/v2/storage_mode.hpp @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace memgraph::storage { + +enum class StorageMode : std::uint8_t { IN_MEMORY_ANALYTICAL, IN_MEMORY_TRANSACTIONAL }; + +} // namespace memgraph::storage diff --git a/src/storage/v2/transaction.hpp b/src/storage/v2/transaction.hpp index 348c3e605..ee5ffe595 100644 --- a/src/storage/v2/transaction.hpp +++ b/src/storage/v2/transaction.hpp @@ -22,6 +22,7 @@ #include "storage/v2/edge.hpp" #include "storage/v2/isolation_level.hpp" #include "storage/v2/property_value.hpp" +#include "storage/v2/storage_mode.hpp" #include "storage/v2/vertex.hpp" #include "storage/v2/view.hpp" @@ -31,12 +32,14 @@ const uint64_t kTimestampInitialId = 0; const uint64_t kTransactionInitialId = 1ULL << 63U; struct Transaction { - Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level) + Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level, + StorageMode storage_mode) : transaction_id(transaction_id), start_timestamp(start_timestamp), command_id(0), must_abort(false), - isolation_level(isolation_level) {} + isolation_level(isolation_level), + storage_mode(storage_mode) {} Transaction(Transaction &&other) noexcept : transaction_id(other.transaction_id.load(std::memory_order_acquire)), @@ -45,7 +48,8 @@ struct Transaction { command_id(other.command_id), deltas(std::move(other.deltas)), must_abort(other.must_abort), - isolation_level(other.isolation_level) {} + isolation_level(other.isolation_level), + storage_mode(other.storage_mode) {} Transaction(const Transaction &) = delete; Transaction &operator=(const Transaction &) = delete; @@ -70,6 +74,7 @@ struct Transaction { std::list deltas; bool must_abort; IsolationLevel isolation_level; + StorageMode storage_mode; }; inline bool operator==(const Transaction &first, const Transaction &second) { diff --git a/src/storage/v2/vertex_accessor.cpp b/src/storage/v2/vertex_accessor.cpp index 3b76080cb..9682565f9 100644 --- a/src/storage/v2/vertex_accessor.cpp +++ b/src/storage/v2/vertex_accessor.cpp @@ -222,6 +222,7 @@ Result VertexAccessor::SetProperty(PropertyId property, const Pro // current code always follows the logical pattern of "create a delta" and // "modify in-place". Additionally, the created delta will make other // transactions get a SERIALIZATION_ERROR. + CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value); vertex_->properties.SetProperty(property, value); diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index ffeb447cc..8f568feab 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -171,6 +171,7 @@ enum class TypeId : uint64_t { AST_FREE_MEMORY_QUERY, AST_TRIGGER_QUERY, AST_ISOLATION_LEVEL_QUERY, + AST_STORAGE_MODE_QUERY, AST_CREATE_SNAPSHOT_QUERY, AST_STREAM_QUERY, AST_SETTING_QUERY, diff --git a/tests/e2e/lba_procedures/show_privileges.py b/tests/e2e/lba_procedures/show_privileges.py index 29f834896..83a2eae97 100644 --- a/tests/e2e/lba_procedures/show_privileges.py +++ b/tests/e2e/lba_procedures/show_privileges.py @@ -37,6 +37,7 @@ BASIC_PRIVILEGES = [ "WEBSOCKET", "MODULE_WRITE", "TRANSACTION_MANAGEMENT", + "STORAGE_MODE", ] @@ -60,7 +61,7 @@ def test_lba_procedures_show_privileges_first_user(): cursor = connect(username="Josip", password="").cursor() result = execute_and_fetch_all(cursor, "SHOW PRIVILEGES FOR Josip;") - assert len(result) == 31 + assert len(result) == 32 fine_privilege_results = [res for res in result if res[0] not in BASIC_PRIVILEGES] diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 08a157e9b..7c93ad585 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -30,3 +30,6 @@ add_subdirectory(env_variable_check) #flag check binaries add_subdirectory(flag_check) + +#flag check binaries +add_subdirectory(storage_mode) diff --git a/tests/integration/storage_mode/CMakeLists.txt b/tests/integration/storage_mode/CMakeLists.txt new file mode 100644 index 000000000..001a9fe51 --- /dev/null +++ b/tests/integration/storage_mode/CMakeLists.txt @@ -0,0 +1,6 @@ +set(target_name memgraph__integration__storage_mode) +set(tester_target_name ${target_name}__tester) + +add_executable(${tester_target_name} tester.cpp) +set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) +target_link_libraries(${tester_target_name} mg-communication) diff --git a/tests/integration/storage_mode/runner.py b/tests/integration/storage_mode/runner.py new file mode 100644 index 000000000..9a3182149 --- /dev/null +++ b/tests/integration/storage_mode/runner.py @@ -0,0 +1,196 @@ +import argparse +import atexit +import os +import subprocess +import sys +import tempfile +import time +from typing import List + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) + +node_queries = [ + "CREATE (:Label {prop:'1'})", + "CREATE (:Label {prop:'2'})", +] + +edge_queries = ["MATCH (l1:Label),(l2:Label) WHERE l1.prop = '1' AND l2.prop = '2' CREATE (l1)-[r:edgeType1]->(l2)"] + +assertion_queries = [ + f"MATCH (n) WITH count(n) as cnt RETURN assert(cnt={len(node_queries)});", + f"MATCH (n)-[e]->(m) WITH count(e) as cnt RETURN assert(cnt={len(edge_queries)});", +] + + +def wait_for_server(port, delay=0.1): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] + while subprocess.call(cmd) != 0: + time.sleep(0.01) + time.sleep(delay) + + +def prepare_memgraph(memgraph_args): + # Start the memgraph binary + memgraph = subprocess.Popen(list(map(str, memgraph_args))) + time.sleep(0.1) + assert memgraph.poll() is None, "Memgraph process died prematurely!" + wait_for_server(7687) + return memgraph + + +def terminate_memgraph(memgraph): + memgraph.terminate() + time.sleep(0.1) + assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!" + + +def execute_tester( + binary, queries, should_fail=False, failure_message="", username="", password="", check_failure=True +): + args = [binary, "--username", username, "--password", password] + if should_fail: + args.append("--should-fail") + if failure_message: + args.extend(["--failure-message", failure_message]) + if check_failure: + args.append("--check-failure") + args.extend(queries) + subprocess.run(args).check_returncode() + + +def execute_test_analytical_mode(memgraph_binary: str, tester_binary: str) -> None: + def execute_queries(queries): + return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="") + + storage_directory = tempfile.TemporaryDirectory() + memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name]) + + print("\033[1;36m~~ Starting creating & loading snapshot test ~~\033[0m") + + execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"]) + + # Prepare all nodes + execute_queries(node_queries) + + # Prepare all edges + execute_queries(edge_queries) + + execute_queries(["CREATE SNAPSHOT;"]) + + print("\033[1;36m~~ Created snapshot ~~\033[0m\n") + + # Shutdown the memgraph binary with wait + terminate_memgraph(memgraph) + + # Start the memgraph binary + memgraph = prepare_memgraph( + [memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"] + ) + + execute_queries(assertion_queries) + + memgraph.terminate() + assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!" + + +def execute_test_switch_analytical_transactional(memgraph_binary: str, tester_binary: str) -> None: + def execute_queries(queries): + return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="") + + storage_directory = tempfile.TemporaryDirectory() + + # Start the memgraph binary + memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name]) + + print("\033[1;36m~~ Starting switch storage modes test ~~\033[0m") + + # switch to IN_MEMORY_ANALYTICAL + execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"]) + + # Prepare all nodes + execute_queries(node_queries) + + # switch back to IN_MEMORY_TRANSACTIONAL + execute_queries(["STORAGE MODE IN_MEMORY_TRANSACTIONAL"]) + + # Prepare all edges + execute_queries(edge_queries) + + execute_queries(["CREATE SNAPSHOT;"]) + + print("\033[1;36m~~ Created snapshot ~~\033[0m\n") + + print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n") + + # Shutdown the memgraph binary with wait + terminate_memgraph(memgraph) + + print("\033[1;36m~~ Starting memgraph with snapshot recovery ~~\033[0m\n") + + memgraph = prepare_memgraph( + [memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"] + ) + + execute_queries(assertion_queries) + + print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n") + memgraph.terminate() + assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!" + + +def execute_test_switch_transactional_analytical(memgraph_binary: str, tester_binary: str) -> None: + def execute_queries(queries): + return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="") + + storage_directory = tempfile.TemporaryDirectory() + + # Start the memgraph binary + memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name]) + + print("\033[1;36m~~ Starting switch storage modes test ~~\033[0m") + + # Prepare all nodes + execute_queries(node_queries) + + # switch to IN_MEMORY_ANALYTICAL + execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"]) + + # Prepare all edges + execute_queries(edge_queries) + + execute_queries(["CREATE SNAPSHOT;"]) + + print("\033[1;36m~~ Created snapshot ~~\033[0m\n") + + print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n") + + # Shutdown the memgraph binary with wait + terminate_memgraph(memgraph) + + print("\033[1;36m~~ Starting memgraph with snapshot recovery ~~\033[0m\n") + + memgraph = prepare_memgraph( + [memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"] + ) + + execute_queries(assertion_queries) + + print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n") + memgraph.terminate() + assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!" + + +if __name__ == "__main__": + memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph") + tester_binary = os.path.join(PROJECT_DIR, "build", "tests", "integration", "storage_mode", "tester") + + parser = argparse.ArgumentParser() + parser.add_argument("--memgraph", default=memgraph_binary) + parser.add_argument("--tester", default=tester_binary) + args = parser.parse_args() + + execute_test_analytical_mode(args.memgraph, args.tester) + execute_test_switch_analytical_transactional(args.memgraph, args.tester) + execute_test_switch_transactional_analytical(args.memgraph, args.tester) + sys.exit(0) diff --git a/tests/integration/storage_mode/tester.cpp b/tests/integration/storage_mode/tester.cpp new file mode 100644 index 000000000..96d2d925a --- /dev/null +++ b/tests/integration/storage_mode/tester.cpp @@ -0,0 +1,85 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include + +#include "communication/bolt/client.hpp" +#include "io/network/endpoint.hpp" +#include "io/network/utils.hpp" + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_int32(port, 7687, "Server port"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); + +DEFINE_bool(check_failure, false, "Set to true to enable failure checking."); +DEFINE_bool(should_fail, false, "Set to true to expect a failure."); +DEFINE_string(failure_message, "", "Set to the expected failure message."); + +/** + * Executes queries passed as positional arguments and verifies whether they + * succeeded, failed, failed with a specific error message or executed without a + * specific error occurring. + */ +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + memgraph::communication::SSLInit sslInit; + + memgraph::io::network::Endpoint endpoint(memgraph::io::network::ResolveHostname(FLAGS_address), FLAGS_port); + + memgraph::communication::ClientContext context(FLAGS_use_ssl); + memgraph::communication::bolt::Client client(context); + + client.Connect(endpoint, FLAGS_username, FLAGS_password); + + for (int i = 1; i < argc; ++i) { + std::string query(argv[i]); + std::cout << query << std::endl; + try { + client.Execute(query, {}); + } catch (const memgraph::communication::bolt::ClientQueryException &e) { + if (!FLAGS_check_failure) { + if (!FLAGS_failure_message.empty() && e.what() == FLAGS_failure_message) { + LOG_FATAL( + "The query should have succeeded or failed with an error " + "message that isn't equal to '{}' but it failed with that error " + "message", + FLAGS_failure_message); + } + continue; + } + if (FLAGS_should_fail) { + if (!FLAGS_failure_message.empty() && e.what() != FLAGS_failure_message) { + LOG_FATAL( + "The query should have failed with an error message of '{}'' but " + "instead it failed with '{}'", + FLAGS_failure_message, e.what()); + } + return 0; + } else { + LOG_FATAL( + "The query shoudn't have failed but it failed with an " + "error message '{}'", + e.what()); + } + } + if (!FLAGS_check_failure) continue; + if (FLAGS_should_fail) { + LOG_FATAL( + "The query should have failed but instead it executed " + "successfully!"); + } + } + + return 0; +} diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 9fcb275ad..8708bf475 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -319,7 +319,7 @@ add_unit_test(storage_v2_property_store.cpp) target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2 fmt) add_unit_test(storage_v2_wal_file.cpp) -target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 fmt) +target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 storage_test_utils fmt) add_unit_test(storage_v2_replication.cpp) target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt) @@ -327,6 +327,9 @@ target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt) add_unit_test(storage_v2_isolation_level.cpp) target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2) +add_unit_test(storage_v2_storage_mode.cpp) +target_link_libraries(${test_prefix}storage_v2_storage_mode mg-storage-v2 storage_test_utils mg-query mg-glue) + add_unit_test(replication_persistence_helper.cpp) target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2) diff --git a/tests/unit/storage_test_utils.cpp b/tests/unit/storage_test_utils.cpp index 03c68a92f..88d2552ba 100644 --- a/tests/unit/storage_test_utils.cpp +++ b/tests/unit/storage_test_utils.cpp @@ -17,4 +17,13 @@ size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, mem for (auto it = vertices.begin(); it != vertices.end(); ++it, ++count) ; return count; -} \ No newline at end of file +} + +std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode) { + switch (storage_mode) { + case memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL: + return "IN_MEMORY_ANALYTICAL"; + case memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL: + return "IN_MEMORY_TRANSACTIONAL"; + } +} diff --git a/tests/unit/storage_test_utils.hpp b/tests/unit/storage_test_utils.hpp index be85a34e7..24df7904d 100644 --- a/tests/unit/storage_test_utils.hpp +++ b/tests/unit/storage_test_utils.hpp @@ -14,4 +14,9 @@ #include "storage/v2/storage.hpp" #include "storage/v2/view.hpp" -size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, memgraph::storage::View view); \ No newline at end of file +size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, memgraph::storage::View view); + +std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode); + +inline constexpr std::array storage_modes{memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL, + memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL}; diff --git a/tests/unit/storage_v2_storage_mode.cpp b/tests/unit/storage_v2_storage_mode.cpp new file mode 100644 index 000000000..5a3bde97f --- /dev/null +++ b/tests/unit/storage_v2_storage_mode.cpp @@ -0,0 +1,134 @@ + + +#include +#include +#include +#include +#include +#include + +#include "interpreter_faker.hpp" +#include "query/exceptions.hpp" +#include "storage/v2/isolation_level.hpp" +#include "storage/v2/storage.hpp" +#include "storage/v2/storage_mode.hpp" +#include "storage/v2/vertex_accessor.hpp" +#include "storage_test_utils.hpp" + +class StorageModeTest : public ::testing::TestWithParam { + public: + struct PrintStringParamToName { + std::string operator()(const testing::TestParamInfo &info) { + return std::string(StorageModeToString(static_cast(info.param))); + } + }; +}; + +// you should be able to see nodes if there is analytics mode +TEST_P(StorageModeTest, Mode) { + const memgraph::storage::StorageMode storage_mode = GetParam(); + + memgraph::storage::Storage storage{ + {.transaction{.isolation_level = memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION}}}; + storage.SetStorageMode(storage_mode); + auto creator = storage.Access(); + auto other_analytics_mode_reader = storage.Access(); + + ASSERT_EQ(CountVertices(creator, memgraph::storage::View::OLD), 0); + ASSERT_EQ(CountVertices(other_analytics_mode_reader, memgraph::storage::View::OLD), 0); + + static constexpr int vertex_creation_count = 10; + { + for (size_t i = 1; i <= vertex_creation_count; i++) { + creator.CreateVertex(); + + int64_t expected_vertices_count = storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL ? i : 0; + ASSERT_EQ(CountVertices(creator, memgraph::storage::View::OLD), expected_vertices_count); + ASSERT_EQ(CountVertices(other_analytics_mode_reader, memgraph::storage::View::OLD), expected_vertices_count); + } + } + + ASSERT_FALSE(creator.Commit().HasError()); +} + +INSTANTIATE_TEST_CASE_P(ParameterizedStorageModeTests, StorageModeTest, ::testing::ValuesIn(storage_modes), + StorageModeTest::PrintStringParamToName()); + +class StorageModeMultiTxTest : public ::testing::Test { + protected: + memgraph::storage::Storage db_; + std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_storage_mode"}; + memgraph::query::InterpreterContext interpreter_context{&db_, {}, data_directory}; + InterpreterFaker running_interpreter{&interpreter_context}, main_interpreter{&interpreter_context}; +}; + +TEST_F(StorageModeMultiTxTest, ModeSwitchInactiveTransaction) { + bool started = false; + std::jthread running_thread = std::jthread( + [this, &started](std::stop_token st, int thread_index) { + running_interpreter.Interpret("CREATE ();"); + started = true; + }, + 0); + + { + while (!started) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL); + main_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL"); + + // should change state + ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL); + + // finish thread + running_thread.request_stop(); + } +} + +TEST_F(StorageModeMultiTxTest, ModeSwitchActiveTransaction) { + // transactional state + ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL); + main_interpreter.Interpret("BEGIN"); + + bool started = false; + bool finished = false; + std::jthread running_thread = std::jthread( + [this, &started, &finished](std::stop_token st, int thread_index) { + started = true; + // running interpreter try to change + running_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL"); + finished = true; + }, + 0); + + { + while (!started) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + // should not change still + ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL); + + main_interpreter.Interpret("COMMIT"); + + while (!finished) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + // should change state + ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL); + + // finish thread + running_thread.request_stop(); + } +} + +TEST_F(StorageModeMultiTxTest, ErrorChangeIsolationLevel) { + ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL); + main_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL"); + + // should change state + ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL); + + ASSERT_THROW(running_interpreter.Interpret("SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;"), + memgraph::query::IsolationLevelModificationInAnalyticsException); +} diff --git a/tests/unit/storage_v2_wal_file.cpp b/tests/unit/storage_v2_wal_file.cpp index 5eb554aab..071bbe2da 100644 --- a/tests/unit/storage_v2_wal_file.cpp +++ b/tests/unit/storage_v2_wal_file.cpp @@ -22,6 +22,7 @@ #include "storage/v2/durability/wal.hpp" #include "storage/v2/mvcc.hpp" #include "storage/v2/name_id_mapper.hpp" +#include "storage_test_utils.hpp" #include "utils/file.hpp" #include "utils/file_locker.hpp" #include "utils/uuid.hpp" @@ -58,15 +59,18 @@ class DeltaGenerator final { explicit Transaction(DeltaGenerator *gen) : gen_(gen), - transaction_(gen->transaction_id_++, gen->timestamp_++, - memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION) {} + transaction_(gen->transaction_id_++, gen->timestamp_++, memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION, + gen->storage_mode_) {} public: memgraph::storage::Vertex *CreateVertex() { auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++); auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_); auto &it = gen_->vertices_.emplace_back(gid, delta); - delta->prev.Set(&it); + if (delta != nullptr) { + delta->prev.Set(&it); + } + if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return ⁢ { memgraph::storage::durability::WalDeltaData data; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_CREATE; @@ -78,6 +82,7 @@ class DeltaGenerator final { void DeleteVertex(memgraph::storage::Vertex *vertex) { memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RecreateObjectTag()); + if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return; { memgraph::storage::durability::WalDeltaData data; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_DELETE; @@ -91,6 +96,7 @@ class DeltaGenerator final { vertex->labels.push_back(label_id); memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RemoveLabelTag(), label_id); + if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return; { memgraph::storage::durability::WalDeltaData data; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_ADD_LABEL; @@ -104,6 +110,7 @@ class DeltaGenerator final { auto label_id = memgraph::storage::LabelId::FromUint(gen_->mapper_.NameToId(label)); vertex->labels.erase(std::find(vertex->labels.begin(), vertex->labels.end(), label_id)); memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::AddLabelTag(), label_id); + if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return; { memgraph::storage::durability::WalDeltaData data; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL; @@ -121,6 +128,7 @@ class DeltaGenerator final { memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::SetPropertyTag(), property_id, old_value); props.SetProperty(property_id, value); + if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return; { memgraph::storage::durability::WalDeltaData data; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY; @@ -136,6 +144,7 @@ class DeltaGenerator final { void Finalize(bool append_transaction_end = true) { auto commit_timestamp = gen_->timestamp_++; + if (transaction_.deltas.empty()) return; for (const auto &delta : transaction_.deltas) { auto owner = delta.prev.Get(); while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) { @@ -183,12 +192,14 @@ class DeltaGenerator final { using DataT = std::vector>; - DeltaGenerator(const std::filesystem::path &data_directory, bool properties_on_edges, uint64_t seq_num) + DeltaGenerator(const std::filesystem::path &data_directory, bool properties_on_edges, uint64_t seq_num, + memgraph::storage::StorageMode storage_mode = memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL) : uuid_(memgraph::utils::GenerateUUID()), epoch_id_(memgraph::utils::GenerateUUID()), seq_num_(seq_num), wal_file_(data_directory, uuid_, epoch_id_, {.properties_on_edges = properties_on_edges}, &mapper_, seq_num, - &file_retainer_) {} + &file_retainer_), + storage_mode_(storage_mode) {} Transaction CreateTransaction() { return Transaction(this); } @@ -274,6 +285,8 @@ class DeltaGenerator final { uint64_t valid_{true}; memgraph::utils::FileRetainer file_retainer_; + + memgraph::storage::StorageMode storage_mode_; }; // NOLINTNEXTLINE(cppcoreguidelines-macro-usage) @@ -621,3 +634,64 @@ TEST_P(WalFileTest, PartialData) { ASSERT_EQ(pos, infos.size() - 2); AssertWalInfoEqual(infos[infos.size() - 1].second, memgraph::storage::durability::ReadWalInfo(current_file)); } + +class StorageModeWalFileTest : public ::testing::TestWithParam { + public: + StorageModeWalFileTest() {} + + void SetUp() override { Clear(); } + + void TearDown() override { Clear(); } + + std::vector GetFilesList() { + std::vector ret; + for (auto &item : std::filesystem::directory_iterator(storage_directory)) { + ret.push_back(item.path()); + } + std::sort(ret.begin(), ret.end()); + std::reverse(ret.begin(), ret.end()); + return ret; + } + + std::filesystem::path storage_directory{std::filesystem::temp_directory_path() / "MG_test_unit_storage_v2_wal_file"}; + struct PrintStringParamToName { + std::string operator()(const testing::TestParamInfo &info) { + return std::string(StorageModeToString(static_cast(info.param))); + } + }; + + private: + void Clear() { + if (!std::filesystem::exists(storage_directory)) return; + std::filesystem::remove_all(storage_directory); + } +}; + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(StorageModeWalFileTest, StorageModeData) { + std::vector> infos; + const memgraph::storage::StorageMode storage_mode = GetParam(); + + { + DeltaGenerator gen(storage_directory, true, 5, storage_mode); + auto tx = gen.CreateTransaction(); + tx.CreateVertex(); + tx.Finalize(true); + infos.emplace_back(gen.GetPosition(), gen.GetInfo()); + + size_t num_expected_deltas = storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL ? 0 : 2; + ASSERT_EQ(infos[0].second.num_deltas, num_expected_deltas); + + auto wal_files = GetFilesList(); + size_t num_expected_wal_files = 1; + ASSERT_EQ(num_expected_wal_files, wal_files.size()); + + if (storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) { + DeltaGenerator gen_empty(storage_directory, true, 5, storage_mode); + ASSERT_EQ(gen.GetPosition(), gen_empty.GetPosition()); + } + } +} + +INSTANTIATE_TEST_CASE_P(ParameterizedWalStorageModeTests, StorageModeWalFileTest, ::testing::ValuesIn(storage_modes), + StorageModeWalFileTest::PrintStringParamToName());