Introduce analytics mode (#772)

This commit is contained in:
Antonio Filipovic 2023-04-04 18:46:26 +02:00 committed by GitHub
parent a586f2f98d
commit 64e837b355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 920 additions and 157 deletions

View File

@ -45,7 +45,9 @@ const std::vector<Permission> kPermissionsAll = {Permission::MATCH, Permis
Permission::FREE_MEMORY, Permission::TRIGGER,
Permission::CONFIG, Permission::STREAM,
Permission::MODULE_READ, Permission::MODULE_WRITE,
Permission::WEBSOCKET, Permission::TRANSACTION_MANAGEMENT};
Permission::WEBSOCKET, Permission::TRANSACTION_MANAGEMENT,
Permission::STORAGE_MODE};
} // namespace
std::string PermissionToString(Permission permission) {
@ -94,6 +96,8 @@ std::string PermissionToString(Permission permission) {
return "WEBSOCKET";
case Permission::TRANSACTION_MANAGEMENT:
return "TRANSACTION_MANAGEMENT";
case Permission::STORAGE_MODE:
return "STORAGE_MODE";
}
}

View File

@ -40,7 +40,8 @@ enum class Permission : uint64_t {
MODULE_READ = 1U << 18U,
MODULE_WRITE = 1U << 19U,
WEBSOCKET = 1U << 20U,
TRANSACTION_MANAGEMENT = 1U << 21U
TRANSACTION_MANAGEMENT = 1U << 21U,
STORAGE_MODE = 1U << 22U
};
// clang-format on

View File

@ -58,6 +58,8 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) {
return auth::Permission::MODULE_WRITE;
case query::AuthQuery::Privilege::WEBSOCKET:
return auth::Permission::WEBSOCKET;
case query::AuthQuery::Privilege::STORAGE_MODE:
return auth::Permission::STORAGE_MODE;
case query::AuthQuery::Privilege::TRANSACTION_MANAGEMENT:
return auth::Permission::TRANSACTION_MANAGEMENT;
}

View File

@ -211,6 +211,21 @@ class IsolationLevelModificationInMulticommandTxException : public QueryExceptio
: QueryException("Isolation level cannot be modified in multicommand transactions.") {}
};
class IsolationLevelModificationInAnalyticsException : public QueryException {
public:
IsolationLevelModificationInAnalyticsException()
: QueryException(
"Isolation level cannot be modified when storage mode is set to IN_MEMORY_ANALYTICAL."
"IN_MEMORY_ANALYTICAL mode doesn't provide any isolation guarantees, "
"you can think about it as an equivalent to READ_UNCOMMITED.") {}
};
class StorageModeModificationInMulticommandTxException : public QueryException {
public:
StorageModeModificationInMulticommandTxException()
: QueryException("Storage mode cannot be modified in multicommand transactions.") {}
};
class CreateSnapshotInMulticommandTxException final : public QueryException {
public:
CreateSnapshotInMulticommandTxException()

View File

@ -243,6 +243,9 @@ constexpr utils::TypeInfo query::TriggerQuery::kType{utils::TypeId::AST_TRIGGER_
constexpr utils::TypeInfo query::IsolationLevelQuery::kType{utils::TypeId::AST_ISOLATION_LEVEL_QUERY,
"IsolationLevelQuery", &query::Query::kType};
constexpr utils::TypeInfo query::StorageModeQuery::kType{utils::TypeId::AST_STORAGE_MODE_QUERY, "StorageModeQuery",
&query::Query::kType};
constexpr utils::TypeInfo query::CreateSnapshotQuery::kType{utils::TypeId::AST_CREATE_SNAPSHOT_QUERY,
"CreateSnapshotQuery", &query::Query::kType};

View File

@ -2714,6 +2714,7 @@ class AuthQuery : public memgraph::query::Query {
MODULE_READ,
MODULE_WRITE,
WEBSOCKET,
STORAGE_MODE,
TRANSACTION_MANAGEMENT
};
@ -2777,7 +2778,8 @@ const std::vector<AuthQuery::Privilege> kPrivilegesAll = {
AuthQuery::Privilege::FREE_MEMORY, AuthQuery::Privilege::TRIGGER,
AuthQuery::Privilege::CONFIG, AuthQuery::Privilege::STREAM,
AuthQuery::Privilege::MODULE_READ, AuthQuery::Privilege::MODULE_WRITE,
AuthQuery::Privilege::WEBSOCKET, AuthQuery::Privilege::TRANSACTION_MANAGEMENT};
AuthQuery::Privilege::WEBSOCKET, AuthQuery::Privilege::TRANSACTION_MANAGEMENT,
AuthQuery::Privilege::STORAGE_MODE};
class InfoQuery : public memgraph::query::Query {
public:
@ -3046,6 +3048,29 @@ class IsolationLevelQuery : public memgraph::query::Query {
friend class AstStorage;
};
class StorageModeQuery : public memgraph::query::Query {
public:
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class StorageMode { IN_MEMORY_TRANSACTIONAL, IN_MEMORY_ANALYTICAL };
StorageModeQuery() = default;
DEFVISITABLE(QueryVisitor<void>);
memgraph::query::StorageModeQuery::StorageMode storage_mode_;
StorageModeQuery *Clone(AstStorage *storage) const override {
StorageModeQuery *object = storage->Create<StorageModeQuery>();
object->storage_mode_ = storage_mode_;
return object;
}
private:
friend class AstStorage;
};
class CreateSnapshotQuery : public memgraph::query::Query {
public:
static const utils::TypeInfo kType;

View File

@ -89,6 +89,7 @@ class LoadCsv;
class FreeMemoryQuery;
class TriggerQuery;
class IsolationLevelQuery;
class StorageModeQuery;
class CreateSnapshotQuery;
class StreamQuery;
class SettingQuery;
@ -134,6 +135,6 @@ class QueryVisitor
: public utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery, InfoQuery,
ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, FreeMemoryQuery, TriggerQuery,
IsolationLevelQuery, CreateSnapshotQuery, StreamQuery, SettingQuery, VersionQuery,
ShowConfigQuery, TransactionQueueQuery, AnalyzeGraphQuery> {};
ShowConfigQuery, TransactionQueueQuery, StorageModeQuery, AnalyzeGraphQuery> {};
} // namespace memgraph::query

View File

@ -487,6 +487,20 @@ antlrcpp::Any CypherMainVisitor::visitIsolationLevelQuery(MemgraphCypher::Isolat
return isolation_level_query;
}
antlrcpp::Any CypherMainVisitor::visitStorageModeQuery(MemgraphCypher::StorageModeQueryContext *ctx) {
auto *storage_mode_query = storage_->Create<StorageModeQuery>();
storage_mode_query->storage_mode_ = std::invoke([mode = ctx->storageMode()]() {
if (mode->IN_MEMORY_ANALYTICAL()) {
return StorageModeQuery::StorageMode::IN_MEMORY_ANALYTICAL;
}
return StorageModeQuery::StorageMode::IN_MEMORY_TRANSACTIONAL;
});
query_ = storage_mode_query;
return storage_mode_query;
}
antlrcpp::Any CypherMainVisitor::visitCreateSnapshotQuery(MemgraphCypher::CreateSnapshotQueryContext *ctx) {
query_ = storage_->Create<CreateSnapshotQuery>();
return query_;
@ -1511,6 +1525,7 @@ antlrcpp::Any CypherMainVisitor::visitPrivilege(MemgraphCypher::PrivilegeContext
if (ctx->MODULE_WRITE()) return AuthQuery::Privilege::MODULE_WRITE;
if (ctx->WEBSOCKET()) return AuthQuery::Privilege::WEBSOCKET;
if (ctx->TRANSACTION_MANAGEMENT()) return AuthQuery::Privilege::TRANSACTION_MANAGEMENT;
if (ctx->STORAGE_MODE()) return AuthQuery::Privilege::STORAGE_MODE;
LOG_FATAL("Should not get here - unknown privilege!");
}

View File

@ -263,6 +263,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitIsolationLevelQuery(MemgraphCypher::IsolationLevelQueryContext *ctx) override;
/**
* @return StorageModeQuery*
*/
antlrcpp::Any visitStorageModeQuery(MemgraphCypher::StorageModeQueryContext *ctx) override;
/**
* @return CreateSnapshotQuery*
*/

View File

@ -59,6 +59,8 @@ memgraphCypherKeyword : cypherKeyword
| HEADER
| IDENTIFIED
| ISOLATION
| IN_MEMORY_ANALYTICAL
| IN_MEMORY_TRANSACTIONAL
| KAFKA
| LABELS
| LEVEL
@ -88,6 +90,7 @@ memgraphCypherKeyword : cypherKeyword
| SNAPSHOT
| START
| STATS
| STORAGE
| STREAM
| STREAMS
| SYNC
@ -127,6 +130,7 @@ query : cypherQuery
| freeMemoryQuery
| triggerQuery
| isolationLevelQuery
| storageModeQuery
| createSnapshotQuery
| streamQuery
| settingQuery
@ -277,6 +281,7 @@ privilege : CREATE
| MODULE_WRITE
| WEBSOCKET
| TRANSACTION_MANAGEMENT
| STORAGE_MODE
;
granularPrivilege : NOTHING | READ | UPDATE | CREATE_DELETE ;
@ -354,6 +359,10 @@ isolationLevelScope : GLOBAL | SESSION | NEXT ;
isolationLevelQuery : SET isolationLevelScope TRANSACTION ISOLATION LEVEL isolationLevel ;
storageMode : IN_MEMORY_ANALYTICAL | IN_MEMORY_TRANSACTIONAL ;
storageModeQuery : STORAGE MODE storageMode ;
createSnapshotQuery : CREATE SNAPSHOT ;
streamName : symbolicName ;

View File

@ -25,103 +25,107 @@ import CypherLexer ;
UNDERSCORE : '_' ;
AFTER : A F T E R ;
ALTER : A L T E R ;
ANALYZE : A N A L Y Z E ;
ASYNC : A S Y N C ;
AUTH : A U T H ;
BAD : B A D ;
BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ;
BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ;
BATCH_SIZE : B A T C H UNDERSCORE S I Z E ;
BEFORE : B E F O R E ;
BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ;
CALL : C A L L ;
CHECK : C H E C K ;
CLEAR : C L E A R ;
COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ;
CONFIGS : C O N F I G S;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CREATE_DELETE : C R E A T E UNDERSCORE D E L E T E ;
CREDENTIALS : C R E D E N T I A L S ;
CSV : C S V ;
DATA : D A T A ;
DELIMITER : D E L I M I T E R ;
DATABASE : D A T A B A S E ;
DENY : D E N Y ;
DIRECTORY : D I R E C T O R Y ;
DROP : D R O P ;
DUMP : D U M P ;
DURABILITY : D U R A B I L I T Y ;
EDGE_TYPES : E D G E UNDERSCORE T Y P E S ;
EXECUTE : E X E C U T E ;
FOR : F O R ;
FOREACH : F O R E A C H;
FREE : F R E E ;
FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ;
FROM : F R O M ;
GLOBAL : G L O B A L ;
GRANT : G R A N T ;
GRAPH : G R A P H ;
GRANTS : G R A N T S ;
HEADER : H E A D E R ;
IDENTIFIED : I D E N T I F I E D ;
IGNORE : I G N O R E ;
ISOLATION : I S O L A T I O N ;
KAFKA : K A F K A ;
LABELS : L A B E L S ;
LEVEL : L E V E L ;
LOAD : L O A D ;
LOCK : L O C K ;
MAIN : M A I N ;
MODE : M O D E ;
MODULE_READ : M O D U L E UNDERSCORE R E A D ;
MODULE_WRITE : M O D U L E UNDERSCORE W R I T E ;
NEXT : N E X T ;
NO : N O ;
NOTHING : N O T H I N G ;
PASSWORD : P A S S W O R D ;
PORT : P O R T ;
PRIVILEGES : P R I V I L E G E S ;
PULSAR : P U L S A R ;
READ : R E A D ;
READ_FILE : R E A D UNDERSCORE F I L E ;
REGISTER : R E G I S T E R ;
REPLICA : R E P L I C A ;
REPLICAS : R E P L I C A S ;
REPLICATION : R E P L I C A T I O N ;
REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
QUOTE : Q U O T E ;
SERVICE_URL : S E R V I C E UNDERSCORE U R L ;
SESSION : S E S S I O N ;
SETTING : S E T T I N G ;
SETTINGS : S E T T I N G S ;
SNAPSHOT : S N A P S H O T ;
START : S T A R T ;
STATISTICS : S T A T I S T I C S ;
STATS : S T A T S ;
STOP : S T O P ;
STREAM : S T R E A M ;
STREAMS : S T R E A M S ;
SYNC : S Y N C ;
TERMINATE : T E R M I N A T E ;
TIMEOUT : T I M E O U T ;
TO : T O ;
TOPICS : T O P I C S;
TRANSACTION : T R A N S A C T I O N ;
TRANSACTION_MANAGEMENT : T R A N S A C T I O N UNDERSCORE M A N A G E M E N T ;
TRANSACTIONS : T R A N S A C T I O N S ;
TRANSFORM : T R A N S F O R M ;
TRIGGER : T R I G G E R ;
TRIGGERS : T R I G G E R S ;
UNCOMMITTED : U N C O M M I T T E D ;
UNLOCK : U N L O C K ;
UPDATE : U P D A T E ;
USER : U S E R ;
USERS : U S E R S ;
VERSION : V E R S I O N ;
WEBSOCKET : W E B S O C K E T ;
AFTER : A F T E R ;
ALTER : A L T E R ;
ANALYZE : A N A L Y Z E ;
ASYNC : A S Y N C ;
AUTH : A U T H ;
BAD : B A D ;
BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ;
BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ;
BATCH_SIZE : B A T C H UNDERSCORE S I Z E ;
BEFORE : B E F O R E ;
BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ;
CALL : C A L L ;
CHECK : C H E C K ;
CLEAR : C L E A R ;
COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ;
CONFIGS : C O N F I G S;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CREATE_DELETE : C R E A T E UNDERSCORE D E L E T E ;
CREDENTIALS : C R E D E N T I A L S ;
CSV : C S V ;
DATA : D A T A ;
DELIMITER : D E L I M I T E R ;
DATABASE : D A T A B A S E ;
DENY : D E N Y ;
DIRECTORY : D I R E C T O R Y ;
DROP : D R O P ;
DUMP : D U M P ;
DURABILITY : D U R A B I L I T Y ;
EDGE_TYPES : E D G E UNDERSCORE T Y P E S ;
EXECUTE : E X E C U T E ;
FOR : F O R ;
FOREACH : F O R E A C H;
FREE : F R E E ;
FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ;
FROM : F R O M ;
GLOBAL : G L O B A L ;
GRANT : G R A N T ;
GRAPH : G R A P H ;
GRANTS : G R A N T S ;
HEADER : H E A D E R ;
IDENTIFIED : I D E N T I F I E D ;
IGNORE : I G N O R E ;
ISOLATION : I S O L A T I O N ;
IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ;
IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ;
KAFKA : K A F K A ;
LABELS : L A B E L S ;
LEVEL : L E V E L ;
LOAD : L O A D ;
LOCK : L O C K ;
MAIN : M A I N ;
MODE : M O D E ;
MODULE_READ : M O D U L E UNDERSCORE R E A D ;
MODULE_WRITE : M O D U L E UNDERSCORE W R I T E ;
NEXT : N E X T ;
NO : N O ;
NOTHING : N O T H I N G ;
PASSWORD : P A S S W O R D ;
PORT : P O R T ;
PRIVILEGES : P R I V I L E G E S ;
PULSAR : P U L S A R ;
READ : R E A D ;
READ_FILE : R E A D UNDERSCORE F I L E ;
REGISTER : R E G I S T E R ;
REPLICA : R E P L I C A ;
REPLICAS : R E P L I C A S ;
REPLICATION : R E P L I C A T I O N ;
REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
QUOTE : Q U O T E ;
SERVICE_URL : S E R V I C E UNDERSCORE U R L ;
SESSION : S E S S I O N ;
SETTING : S E T T I N G ;
SETTINGS : S E T T I N G S ;
SNAPSHOT : S N A P S H O T ;
START : S T A R T ;
STATISTICS : S T A T I S T I C S ;
STATS : S T A T S ;
STOP : S T O P ;
STORAGE : S T O R A G E;
STORAGE_MODE : S T O R A G E UNDERSCORE MODE;
STREAM : S T R E A M ;
STREAMS : S T R E A M S ;
SYNC : S Y N C ;
TERMINATE : T E R M I N A T E ;
TIMEOUT : T I M E O U T ;
TO : T O ;
TOPICS : T O P I C S;
TRANSACTION : T R A N S A C T I O N ;
TRANSACTION_MANAGEMENT : T R A N S A C T I O N UNDERSCORE M A N A G E M E N T ;
TRANSACTIONS : T R A N S A C T I O N S ;
TRANSFORM : T R A N S F O R M ;
TRIGGER : T R I G G E R ;
TRIGGERS : T R I G G E R S ;
UNCOMMITTED : U N C O M M I T T E D ;
UNLOCK : U N L O C K ;
UPDATE : U P D A T E ;
USER : U S E R ;
USERS : U S E R S ;
VERSION : V E R S I O N ;
WEBSOCKET : W E B S O C K E T ;

View File

@ -78,6 +78,8 @@ class PrivilegeExtractor : public QueryVisitor<void>, public HierarchicalTreeVis
void Visit(IsolationLevelQuery &isolation_level_query) override { AddPrivilege(AuthQuery::Privilege::CONFIG); }
void Visit(StorageModeQuery & /*storage_mode_query*/) override { AddPrivilege(AuthQuery::Privilege::STORAGE_MODE); }
void Visit(CreateSnapshotQuery &create_snapshot_query) override { AddPrivilege(AuthQuery::Privilege::DURABILITY); }
void Visit(SettingQuery & /*setting_query*/) override { AddPrivilege(AuthQuery::Privilege::CONFIG); }

View File

@ -208,7 +208,10 @@ const trie::Trie kKeywords = {"union",
"websocket",
"foreach",
"labels",
"edge_types"};
"edge_types",
"off",
"in_memory_transactional",
"in_memory_analytical"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -1169,7 +1169,6 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
if (in_explicit_transaction_) {
throw ExplicitTransactionUsageException("Nested transactions are not supported.");
}
in_explicit_transaction_ = true;
expect_rollback_ = false;
@ -2000,7 +1999,16 @@ constexpr auto ToStorageIsolationLevel(const IsolationLevelQuery::IsolationLevel
}
}
PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
constexpr auto ToStorageMode(const StorageModeQuery::StorageMode storage_mode) noexcept {
switch (storage_mode) {
case StorageModeQuery::StorageMode::IN_MEMORY_TRANSACTIONAL:
return storage::StorageMode::IN_MEMORY_TRANSACTIONAL;
case StorageModeQuery::StorageMode::IN_MEMORY_ANALYTICAL:
return storage::StorageMode::IN_MEMORY_ANALYTICAL;
}
}
PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context, Interpreter *interpreter) {
if (in_explicit_transaction) {
throw IsolationLevelModificationInMulticommandTxException();
@ -2015,7 +2023,15 @@ PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_expli
interpreter]() -> std::function<void()> {
switch (isolation_level_query->isolation_level_scope_) {
case IsolationLevelQuery::IsolationLevelScope::GLOBAL:
return [interpreter_context, isolation_level] { interpreter_context->db->SetIsolationLevel(isolation_level); };
return [interpreter_context, isolation_level] {
if (auto maybe_error = interpreter_context->db->SetIsolationLevel(isolation_level); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case storage::Storage::SetIsolationLevelError::DisabledForAnalyticalMode:
throw IsolationLevelModificationInAnalyticsException();
break;
}
}
};
case IsolationLevelQuery::IsolationLevelScope::SESSION:
return [interpreter, isolation_level] { interpreter->SetSessionIsolationLevel(isolation_level); };
case IsolationLevelQuery::IsolationLevelScope::NEXT:
@ -2033,6 +2049,41 @@ PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_expli
RWType::NONE};
}
PreparedQuery PrepareStorageModeQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context) {
if (in_explicit_transaction) {
throw StorageModeModificationInMulticommandTxException();
}
auto *storage_mode_query = utils::Downcast<StorageModeQuery>(parsed_query.query);
MG_ASSERT(storage_mode_query);
const auto storage_mode = ToStorageMode(storage_mode_query->storage_mode_);
auto exists_active_transaction = interpreter_context->interpreters.WithLock([](const auto &interpreters_) {
return std::any_of(interpreters_.begin(), interpreters_.end(), [](const auto &interpreter) {
return interpreter->transaction_status_.load() != TransactionStatus::IDLE;
});
});
if (exists_active_transaction) {
spdlog::info(
"Storage mode will be modified when there are no other active transactions. Check the status of the "
"transactions using 'SHOW TRANSACTIONS' query and ensure no other transactions are active.");
}
auto callback = [storage_mode, interpreter_context]() -> std::function<void()> {
return [interpreter_context, storage_mode] { interpreter_context->db->SetStorageMode(storage_mode); };
}();
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[callback = std::move(callback)](AnyStream * /*stream*/,
std::optional<int> /*n*/) -> std::optional<QueryHandlerResult> {
callback();
return QueryHandlerResult::COMMIT;
},
RWType::NONE};
}
PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
InterpreterContext *interpreter_context) {
if (in_explicit_transaction) {
@ -2043,11 +2094,18 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
{},
std::move(parsed_query.required_privileges),
[interpreter_context](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
if (auto maybe_error = interpreter_context->db->CreateSnapshot(); maybe_error.HasError()) {
if (auto maybe_error = interpreter_context->db->CreateSnapshot({}); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case storage::Storage::CreateSnapshotError::DisabledForReplica:
throw utils::BasicException(
"Failed to create a snapshot. Replica instances are not allowed to create them.");
case storage::Storage::CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/replication"));
break;
case storage::Storage::CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;
}
}
return QueryHandlerResult::COMMIT;
@ -2769,6 +2827,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_);
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
} else if (utils::Downcast<StorageModeQuery>(parsed_query.query)) {
prepared_query = PrepareStorageModeQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
} else if (utils::Downcast<TransactionQueueQuery>(parsed_query.query)) {
prepared_query = PrepareTransactionQueueQuery(std::move(parsed_query), username_, in_explicit_transaction_,
interpreter_context_, &*execution_db_accessor_);

View File

@ -117,6 +117,7 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
// current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value);
edge_.ptr->properties.SetProperty(property, value);

View File

@ -12,6 +12,7 @@
#pragma once
#include <atomic>
#include "storage/v2/property_value.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp"
@ -95,6 +96,9 @@ inline bool PrepareForWrite(Transaction *transaction, TObj *object) {
/// a `DELETE_OBJECT` delta).
/// @throw std::bad_alloc
inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
return nullptr;
}
transaction->EnsureCommitTimestampExists();
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(),
transaction->command_id);
@ -105,6 +109,9 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
/// @throw std::bad_alloc
template <typename TObj, class... Args>
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&...args) {
if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
return;
}
transaction->EnsureCommitTimestampExists();
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_timestamp.get(),
transaction->command_id);

View File

@ -31,6 +31,7 @@
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/file.hpp"
@ -316,6 +317,7 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
Storage::Storage(Config config)
: indices_(&constraints_, config.items),
isolation_level_(config.transaction.isolation_level),
storage_mode_(StorageMode::IN_MEMORY_TRANSACTIONAL),
config_(config),
snapshot_directory_(config_.durability.storage_directory / durability::kSnapshotDirectory),
wal_directory_(config_.durability.storage_directory / durability::kWalDirectory),
@ -398,12 +400,19 @@ Storage::Storage(Config config)
}
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] {
if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) {
if (auto maybe_error = this->CreateSnapshot({true}); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica:
spdlog::warn(
utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
break;
case CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/durability"));
break;
case storage::Storage::CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;
}
}
});
@ -446,23 +455,30 @@ Storage::~Storage() {
snapshot_runner_.Stop();
}
if (config_.durability.snapshot_on_exit) {
if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) {
if (auto maybe_error = this->CreateSnapshot({false}); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica:
spdlog::warn(utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
break;
case CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/replication"));
break;
case storage::Storage::CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;
}
}
}
}
Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level)
Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode)
: storage_(storage),
// The lock must be acquired before creating the transaction object to
// prevent freshly created transactions from dangling in an active state
// during exclusive operations.
storage_guard_(storage_->main_lock_),
transaction_(storage->CreateTransaction(isolation_level)),
transaction_(storage->CreateTransaction(isolation_level, storage_mode)),
is_transaction_active_(true),
config_(storage->config_.items) {}
@ -490,11 +506,16 @@ VertexAccessor Storage::Accessor::CreateVertex() {
OOMExceptionEnabler oom_exception;
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
delta->prev.Set(&*it);
if (delta) {
delta->prev.Set(&*it);
}
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_);
}
@ -509,11 +530,14 @@ VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
storage_->vertex_id_.store(std::max(storage_->vertex_id_.load(std::memory_order_acquire), gid.AsUint() + 1),
std::memory_order_release);
auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Vertex{gid, delta});
MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
delta->prev.Set(&*it);
if (delta) {
delta->prev.Set(&*it);
}
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_);
}
@ -656,12 +680,15 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
EdgeRef edge(gid);
if (config_.properties_on_edges) {
auto acc = storage_->edges_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Edge(gid, delta));
MG_ASSERT(inserted, "The edge must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
edge = EdgeRef(&*it);
delta->prev.Set(&*it);
if (delta) {
delta->prev.Set(&*it);
}
}
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
@ -724,12 +751,15 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
EdgeRef edge(gid);
if (config_.properties_on_edges) {
auto acc = storage_->edges_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Edge(gid, delta));
MG_ASSERT(inserted, "The edge must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
edge = EdgeRef(&*it);
delta->prev.Set(&*it);
if (delta) {
delta->prev.Set(&*it);
}
}
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
@ -1380,7 +1410,7 @@ VerticesIterable Storage::Accessor::Vertices(LabelId label, PropertyId property,
storage_->indices_.label_property_index.Vertices(label, property, lower_bound, upper_bound, view, &transaction_));
}
Transaction Storage::CreateTransaction(IsolationLevel isolation_level) {
Transaction Storage::CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) {
// We acquire the transaction engine lock here because we access (and
// modify) the transaction engine variables (`transaction_id` and
// `timestamp`) below.
@ -1401,7 +1431,7 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level) {
start_timestamp = timestamp_++;
}
}
return {transaction_id, start_timestamp, isolation_level};
return {transaction_id, start_timestamp, isolation_level, storage_mode};
}
template <bool force>
@ -1907,28 +1937,48 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera
return finalized_on_all_replicas;
}
utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot() {
utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot(std::optional<bool> is_periodic) {
if (replication_role_.load() != ReplicationRole::MAIN) {
return CreateSnapshotError::DisabledForReplica;
}
auto snapshot_creator = [this]() {
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_);
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_, &name_id_mapper_,
&indices_, &constraints_, config_.items, uuid_, epoch_id_, epoch_history_,
&file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);
};
std::lock_guard snapshot_guard(snapshot_lock_);
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
auto should_try_shared{true};
auto max_num_tries{10};
while (max_num_tries) {
if (should_try_shared) {
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
snapshot_creator();
return {};
}
} else {
std::unique_lock main_guard{main_lock_};
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) {
if (is_periodic && *is_periodic) {
return CreateSnapshotError::DisabledForAnalyticsPeriodicCommit;
}
snapshot_creator();
return {};
}
}
should_try_shared = !should_try_shared;
max_num_tries--;
}
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION);
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_, &name_id_mapper_,
&indices_, &constraints_, config_.items, uuid_, epoch_id_, epoch_history_,
&file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);
return {};
return CreateSnapshotError::ReachedMaxNumTries;
}
bool Storage::LockPath() {
@ -2114,11 +2164,23 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
});
}
void Storage::SetIsolationLevel(IsolationLevel isolation_level) {
utils::BasicResult<Storage::SetIsolationLevelError> Storage::SetIsolationLevel(IsolationLevel isolation_level) {
std::unique_lock main_guard{main_lock_};
if (storage_mode_ == storage::StorageMode::IN_MEMORY_ANALYTICAL) {
return Storage::SetIsolationLevelError::DisabledForAnalyticalMode;
}
isolation_level_ = isolation_level;
return {};
}
void Storage::SetStorageMode(StorageMode storage_mode) {
std::unique_lock main_guard{main_lock_};
storage_mode_ = storage_mode;
}
StorageMode Storage::GetStorageMode() { return storage_mode_; }
void Storage::RestoreReplicas() {
MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole());
if (!ShouldStoreAndRestoreReplicas()) {

View File

@ -33,6 +33,7 @@
#include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/result.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
@ -200,7 +201,7 @@ class Storage final {
private:
friend class Storage;
explicit Accessor(Storage *storage, IsolationLevel isolation_level);
explicit Accessor(Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode);
public:
Accessor(const Accessor &) = delete;
@ -368,7 +369,7 @@ class Storage final {
};
Accessor Access(std::optional<IsolationLevel> override_isolation_level = {}) {
return Accessor{this, override_isolation_level.value_or(isolation_level_)};
return Accessor{this, override_isolation_level.value_or(isolation_level_), storage_mode_};
}
const std::string &LabelToName(LabelId label) const;
@ -509,14 +510,24 @@ class Storage final {
void FreeMemory();
void SetIsolationLevel(IsolationLevel isolation_level);
enum class SetIsolationLevelError : uint8_t { DisabledForAnalyticalMode };
enum class CreateSnapshotError : uint8_t { DisabledForReplica };
utils::BasicResult<SetIsolationLevelError> SetIsolationLevel(IsolationLevel isolation_level);
utils::BasicResult<CreateSnapshotError> CreateSnapshot();
void SetStorageMode(StorageMode storage_mode);
StorageMode GetStorageMode();
enum class CreateSnapshotError : uint8_t {
DisabledForReplica,
DisabledForAnalyticsPeriodicCommit,
ReachedMaxNumTries
};
utils::BasicResult<CreateSnapshotError> CreateSnapshot(std::optional<bool> is_periodic);
private:
Transaction CreateTransaction(IsolationLevel isolation_level);
Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode);
/// The force parameter determines the behaviour of the garbage collector.
/// If it's set to true, it will behave as a global operation, i.e. it can't
@ -582,6 +593,7 @@ class Storage final {
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
IsolationLevel isolation_level_;
StorageMode storage_mode_;
Config config_;
utils::Scheduler gc_runner_;

View File

@ -0,0 +1,9 @@
#pragma once
#include <cstdint>
namespace memgraph::storage {
enum class StorageMode : std::uint8_t { IN_MEMORY_ANALYTICAL, IN_MEMORY_TRANSACTIONAL };
} // namespace memgraph::storage

View File

@ -22,6 +22,7 @@
#include "storage/v2/edge.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/view.hpp"
@ -31,12 +32,14 @@ const uint64_t kTimestampInitialId = 0;
const uint64_t kTransactionInitialId = 1ULL << 63U;
struct Transaction {
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level)
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level,
StorageMode storage_mode)
: transaction_id(transaction_id),
start_timestamp(start_timestamp),
command_id(0),
must_abort(false),
isolation_level(isolation_level) {}
isolation_level(isolation_level),
storage_mode(storage_mode) {}
Transaction(Transaction &&other) noexcept
: transaction_id(other.transaction_id.load(std::memory_order_acquire)),
@ -45,7 +48,8 @@ struct Transaction {
command_id(other.command_id),
deltas(std::move(other.deltas)),
must_abort(other.must_abort),
isolation_level(other.isolation_level) {}
isolation_level(other.isolation_level),
storage_mode(other.storage_mode) {}
Transaction(const Transaction &) = delete;
Transaction &operator=(const Transaction &) = delete;
@ -70,6 +74,7 @@ struct Transaction {
std::list<Delta> deltas;
bool must_abort;
IsolationLevel isolation_level;
StorageMode storage_mode;
};
inline bool operator==(const Transaction &first, const Transaction &second) {

View File

@ -222,6 +222,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
// current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value);
vertex_->properties.SetProperty(property, value);

View File

@ -171,6 +171,7 @@ enum class TypeId : uint64_t {
AST_FREE_MEMORY_QUERY,
AST_TRIGGER_QUERY,
AST_ISOLATION_LEVEL_QUERY,
AST_STORAGE_MODE_QUERY,
AST_CREATE_SNAPSHOT_QUERY,
AST_STREAM_QUERY,
AST_SETTING_QUERY,

View File

@ -37,6 +37,7 @@ BASIC_PRIVILEGES = [
"WEBSOCKET",
"MODULE_WRITE",
"TRANSACTION_MANAGEMENT",
"STORAGE_MODE",
]
@ -60,7 +61,7 @@ def test_lba_procedures_show_privileges_first_user():
cursor = connect(username="Josip", password="").cursor()
result = execute_and_fetch_all(cursor, "SHOW PRIVILEGES FOR Josip;")
assert len(result) == 31
assert len(result) == 32
fine_privilege_results = [res for res in result if res[0] not in BASIC_PRIVILEGES]

View File

@ -30,3 +30,6 @@ add_subdirectory(env_variable_check)
#flag check binaries
add_subdirectory(flag_check)
#flag check binaries
add_subdirectory(storage_mode)

View File

@ -0,0 +1,6 @@
set(target_name memgraph__integration__storage_mode)
set(tester_target_name ${target_name}__tester)
add_executable(${tester_target_name} tester.cpp)
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
target_link_libraries(${tester_target_name} mg-communication)

View File

@ -0,0 +1,196 @@
import argparse
import atexit
import os
import subprocess
import sys
import tempfile
import time
from typing import List
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
node_queries = [
"CREATE (:Label {prop:'1'})",
"CREATE (:Label {prop:'2'})",
]
edge_queries = ["MATCH (l1:Label),(l2:Label) WHERE l1.prop = '1' AND l2.prop = '2' CREATE (l1)-[r:edgeType1]->(l2)"]
assertion_queries = [
f"MATCH (n) WITH count(n) as cnt RETURN assert(cnt={len(node_queries)});",
f"MATCH (n)-[e]->(m) WITH count(e) as cnt RETURN assert(cnt={len(edge_queries)});",
]
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def prepare_memgraph(memgraph_args):
# Start the memgraph binary
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
return memgraph
def terminate_memgraph(memgraph):
memgraph.terminate()
time.sleep(0.1)
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
def execute_tester(
binary, queries, should_fail=False, failure_message="", username="", password="", check_failure=True
):
args = [binary, "--username", username, "--password", password]
if should_fail:
args.append("--should-fail")
if failure_message:
args.extend(["--failure-message", failure_message])
if check_failure:
args.append("--check-failure")
args.extend(queries)
subprocess.run(args).check_returncode()
def execute_test_analytical_mode(memgraph_binary: str, tester_binary: str) -> None:
def execute_queries(queries):
return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="")
storage_directory = tempfile.TemporaryDirectory()
memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name])
print("\033[1;36m~~ Starting creating & loading snapshot test ~~\033[0m")
execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"])
# Prepare all nodes
execute_queries(node_queries)
# Prepare all edges
execute_queries(edge_queries)
execute_queries(["CREATE SNAPSHOT;"])
print("\033[1;36m~~ Created snapshot ~~\033[0m\n")
# Shutdown the memgraph binary with wait
terminate_memgraph(memgraph)
# Start the memgraph binary
memgraph = prepare_memgraph(
[memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"]
)
execute_queries(assertion_queries)
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
def execute_test_switch_analytical_transactional(memgraph_binary: str, tester_binary: str) -> None:
def execute_queries(queries):
return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="")
storage_directory = tempfile.TemporaryDirectory()
# Start the memgraph binary
memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name])
print("\033[1;36m~~ Starting switch storage modes test ~~\033[0m")
# switch to IN_MEMORY_ANALYTICAL
execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"])
# Prepare all nodes
execute_queries(node_queries)
# switch back to IN_MEMORY_TRANSACTIONAL
execute_queries(["STORAGE MODE IN_MEMORY_TRANSACTIONAL"])
# Prepare all edges
execute_queries(edge_queries)
execute_queries(["CREATE SNAPSHOT;"])
print("\033[1;36m~~ Created snapshot ~~\033[0m\n")
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
# Shutdown the memgraph binary with wait
terminate_memgraph(memgraph)
print("\033[1;36m~~ Starting memgraph with snapshot recovery ~~\033[0m\n")
memgraph = prepare_memgraph(
[memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"]
)
execute_queries(assertion_queries)
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
def execute_test_switch_transactional_analytical(memgraph_binary: str, tester_binary: str) -> None:
def execute_queries(queries):
return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="")
storage_directory = tempfile.TemporaryDirectory()
# Start the memgraph binary
memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name])
print("\033[1;36m~~ Starting switch storage modes test ~~\033[0m")
# Prepare all nodes
execute_queries(node_queries)
# switch to IN_MEMORY_ANALYTICAL
execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"])
# Prepare all edges
execute_queries(edge_queries)
execute_queries(["CREATE SNAPSHOT;"])
print("\033[1;36m~~ Created snapshot ~~\033[0m\n")
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
# Shutdown the memgraph binary with wait
terminate_memgraph(memgraph)
print("\033[1;36m~~ Starting memgraph with snapshot recovery ~~\033[0m\n")
memgraph = prepare_memgraph(
[memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"]
)
execute_queries(assertion_queries)
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
if __name__ == "__main__":
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
tester_binary = os.path.join(PROJECT_DIR, "build", "tests", "integration", "storage_mode", "tester")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--tester", default=tester_binary)
args = parser.parse_args()
execute_test_analytical_mode(args.memgraph, args.tester)
execute_test_switch_analytical_transactional(args.memgraph, args.tester)
execute_test_switch_transactional_analytical(args.memgraph, args.tester)
sys.exit(0)

View File

@ -0,0 +1,85 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gflags/gflags.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_bool(check_failure, false, "Set to true to enable failure checking.");
DEFINE_bool(should_fail, false, "Set to true to expect a failure.");
DEFINE_string(failure_message, "", "Set to the expected failure message.");
/**
* Executes queries passed as positional arguments and verifies whether they
* succeeded, failed, failed with a specific error message or executed without a
* specific error occurring.
*/
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
memgraph::communication::SSLInit sslInit;
memgraph::io::network::Endpoint endpoint(memgraph::io::network::ResolveHostname(FLAGS_address), FLAGS_port);
memgraph::communication::ClientContext context(FLAGS_use_ssl);
memgraph::communication::bolt::Client client(context);
client.Connect(endpoint, FLAGS_username, FLAGS_password);
for (int i = 1; i < argc; ++i) {
std::string query(argv[i]);
std::cout << query << std::endl;
try {
client.Execute(query, {});
} catch (const memgraph::communication::bolt::ClientQueryException &e) {
if (!FLAGS_check_failure) {
if (!FLAGS_failure_message.empty() && e.what() == FLAGS_failure_message) {
LOG_FATAL(
"The query should have succeeded or failed with an error "
"message that isn't equal to '{}' but it failed with that error "
"message",
FLAGS_failure_message);
}
continue;
}
if (FLAGS_should_fail) {
if (!FLAGS_failure_message.empty() && e.what() != FLAGS_failure_message) {
LOG_FATAL(
"The query should have failed with an error message of '{}'' but "
"instead it failed with '{}'",
FLAGS_failure_message, e.what());
}
return 0;
} else {
LOG_FATAL(
"The query shoudn't have failed but it failed with an "
"error message '{}'",
e.what());
}
}
if (!FLAGS_check_failure) continue;
if (FLAGS_should_fail) {
LOG_FATAL(
"The query should have failed but instead it executed "
"successfully!");
}
}
return 0;
}

View File

@ -319,7 +319,7 @@ add_unit_test(storage_v2_property_store.cpp)
target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2 fmt)
add_unit_test(storage_v2_wal_file.cpp)
target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 fmt)
target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 storage_test_utils fmt)
add_unit_test(storage_v2_replication.cpp)
target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
@ -327,6 +327,9 @@ target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
add_unit_test(storage_v2_isolation_level.cpp)
target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2)
add_unit_test(storage_v2_storage_mode.cpp)
target_link_libraries(${test_prefix}storage_v2_storage_mode mg-storage-v2 storage_test_utils mg-query mg-glue)
add_unit_test(replication_persistence_helper.cpp)
target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2)

View File

@ -17,4 +17,13 @@ size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, mem
for (auto it = vertices.begin(); it != vertices.end(); ++it, ++count)
;
return count;
}
}
std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode) {
switch (storage_mode) {
case memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL:
return "IN_MEMORY_ANALYTICAL";
case memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL:
return "IN_MEMORY_TRANSACTIONAL";
}
}

View File

@ -14,4 +14,9 @@
#include "storage/v2/storage.hpp"
#include "storage/v2/view.hpp"
size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, memgraph::storage::View view);
size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, memgraph::storage::View view);
std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode);
inline constexpr std::array storage_modes{memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL,
memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL};

View File

@ -0,0 +1,134 @@
#include <gtest/gtest.h>
#include <chrono>
#include <stop_token>
#include <string>
#include <string_view>
#include <thread>
#include "interpreter_faker.hpp"
#include "query/exceptions.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage_test_utils.hpp"
class StorageModeTest : public ::testing::TestWithParam<memgraph::storage::StorageMode> {
public:
struct PrintStringParamToName {
std::string operator()(const testing::TestParamInfo<memgraph::storage::StorageMode> &info) {
return std::string(StorageModeToString(static_cast<memgraph::storage::StorageMode>(info.param)));
}
};
};
// you should be able to see nodes if there is analytics mode
TEST_P(StorageModeTest, Mode) {
const memgraph::storage::StorageMode storage_mode = GetParam();
memgraph::storage::Storage storage{
{.transaction{.isolation_level = memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION}}};
storage.SetStorageMode(storage_mode);
auto creator = storage.Access();
auto other_analytics_mode_reader = storage.Access();
ASSERT_EQ(CountVertices(creator, memgraph::storage::View::OLD), 0);
ASSERT_EQ(CountVertices(other_analytics_mode_reader, memgraph::storage::View::OLD), 0);
static constexpr int vertex_creation_count = 10;
{
for (size_t i = 1; i <= vertex_creation_count; i++) {
creator.CreateVertex();
int64_t expected_vertices_count = storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL ? i : 0;
ASSERT_EQ(CountVertices(creator, memgraph::storage::View::OLD), expected_vertices_count);
ASSERT_EQ(CountVertices(other_analytics_mode_reader, memgraph::storage::View::OLD), expected_vertices_count);
}
}
ASSERT_FALSE(creator.Commit().HasError());
}
INSTANTIATE_TEST_CASE_P(ParameterizedStorageModeTests, StorageModeTest, ::testing::ValuesIn(storage_modes),
StorageModeTest::PrintStringParamToName());
class StorageModeMultiTxTest : public ::testing::Test {
protected:
memgraph::storage::Storage db_;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_storage_mode"};
memgraph::query::InterpreterContext interpreter_context{&db_, {}, data_directory};
InterpreterFaker running_interpreter{&interpreter_context}, main_interpreter{&interpreter_context};
};
TEST_F(StorageModeMultiTxTest, ModeSwitchInactiveTransaction) {
bool started = false;
std::jthread running_thread = std::jthread(
[this, &started](std::stop_token st, int thread_index) {
running_interpreter.Interpret("CREATE ();");
started = true;
},
0);
{
while (!started) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL");
// should change state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL);
// finish thread
running_thread.request_stop();
}
}
TEST_F(StorageModeMultiTxTest, ModeSwitchActiveTransaction) {
// transactional state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("BEGIN");
bool started = false;
bool finished = false;
std::jthread running_thread = std::jthread(
[this, &started, &finished](std::stop_token st, int thread_index) {
started = true;
// running interpreter try to change
running_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL");
finished = true;
},
0);
{
while (!started) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// should not change still
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("COMMIT");
while (!finished) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// should change state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL);
// finish thread
running_thread.request_stop();
}
}
TEST_F(StorageModeMultiTxTest, ErrorChangeIsolationLevel) {
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL");
// should change state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL);
ASSERT_THROW(running_interpreter.Interpret("SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;"),
memgraph::query::IsolationLevelModificationInAnalyticsException);
}

View File

@ -22,6 +22,7 @@
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage_test_utils.hpp"
#include "utils/file.hpp"
#include "utils/file_locker.hpp"
#include "utils/uuid.hpp"
@ -58,15 +59,18 @@ class DeltaGenerator final {
explicit Transaction(DeltaGenerator *gen)
: gen_(gen),
transaction_(gen->transaction_id_++, gen->timestamp_++,
memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION) {}
transaction_(gen->transaction_id_++, gen->timestamp_++, memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION,
gen->storage_mode_) {}
public:
memgraph::storage::Vertex *CreateVertex() {
auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++);
auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_);
auto &it = gen_->vertices_.emplace_back(gid, delta);
delta->prev.Set(&it);
if (delta != nullptr) {
delta->prev.Set(&it);
}
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return &it;
{
memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_CREATE;
@ -78,6 +82,7 @@ class DeltaGenerator final {
void DeleteVertex(memgraph::storage::Vertex *vertex) {
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RecreateObjectTag());
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{
memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_DELETE;
@ -91,6 +96,7 @@ class DeltaGenerator final {
vertex->labels.push_back(label_id);
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RemoveLabelTag(),
label_id);
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{
memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_ADD_LABEL;
@ -104,6 +110,7 @@ class DeltaGenerator final {
auto label_id = memgraph::storage::LabelId::FromUint(gen_->mapper_.NameToId(label));
vertex->labels.erase(std::find(vertex->labels.begin(), vertex->labels.end(), label_id));
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::AddLabelTag(), label_id);
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{
memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL;
@ -121,6 +128,7 @@ class DeltaGenerator final {
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::SetPropertyTag(),
property_id, old_value);
props.SetProperty(property_id, value);
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{
memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY;
@ -136,6 +144,7 @@ class DeltaGenerator final {
void Finalize(bool append_transaction_end = true) {
auto commit_timestamp = gen_->timestamp_++;
if (transaction_.deltas.empty()) return;
for (const auto &delta : transaction_.deltas) {
auto owner = delta.prev.Get();
while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) {
@ -183,12 +192,14 @@ class DeltaGenerator final {
using DataT = std::vector<std::pair<uint64_t, memgraph::storage::durability::WalDeltaData>>;
DeltaGenerator(const std::filesystem::path &data_directory, bool properties_on_edges, uint64_t seq_num)
DeltaGenerator(const std::filesystem::path &data_directory, bool properties_on_edges, uint64_t seq_num,
memgraph::storage::StorageMode storage_mode = memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL)
: uuid_(memgraph::utils::GenerateUUID()),
epoch_id_(memgraph::utils::GenerateUUID()),
seq_num_(seq_num),
wal_file_(data_directory, uuid_, epoch_id_, {.properties_on_edges = properties_on_edges}, &mapper_, seq_num,
&file_retainer_) {}
&file_retainer_),
storage_mode_(storage_mode) {}
Transaction CreateTransaction() { return Transaction(this); }
@ -274,6 +285,8 @@ class DeltaGenerator final {
uint64_t valid_{true};
memgraph::utils::FileRetainer file_retainer_;
memgraph::storage::StorageMode storage_mode_;
};
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
@ -621,3 +634,64 @@ TEST_P(WalFileTest, PartialData) {
ASSERT_EQ(pos, infos.size() - 2);
AssertWalInfoEqual(infos[infos.size() - 1].second, memgraph::storage::durability::ReadWalInfo(current_file));
}
class StorageModeWalFileTest : public ::testing::TestWithParam<memgraph::storage::StorageMode> {
public:
StorageModeWalFileTest() {}
void SetUp() override { Clear(); }
void TearDown() override { Clear(); }
std::vector<std::filesystem::path> GetFilesList() {
std::vector<std::filesystem::path> ret;
for (auto &item : std::filesystem::directory_iterator(storage_directory)) {
ret.push_back(item.path());
}
std::sort(ret.begin(), ret.end());
std::reverse(ret.begin(), ret.end());
return ret;
}
std::filesystem::path storage_directory{std::filesystem::temp_directory_path() / "MG_test_unit_storage_v2_wal_file"};
struct PrintStringParamToName {
std::string operator()(const testing::TestParamInfo<memgraph::storage::StorageMode> &info) {
return std::string(StorageModeToString(static_cast<memgraph::storage::StorageMode>(info.param)));
}
};
private:
void Clear() {
if (!std::filesystem::exists(storage_directory)) return;
std::filesystem::remove_all(storage_directory);
}
};
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(StorageModeWalFileTest, StorageModeData) {
std::vector<std::pair<uint64_t, memgraph::storage::durability::WalInfo>> infos;
const memgraph::storage::StorageMode storage_mode = GetParam();
{
DeltaGenerator gen(storage_directory, true, 5, storage_mode);
auto tx = gen.CreateTransaction();
tx.CreateVertex();
tx.Finalize(true);
infos.emplace_back(gen.GetPosition(), gen.GetInfo());
size_t num_expected_deltas = storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL ? 0 : 2;
ASSERT_EQ(infos[0].second.num_deltas, num_expected_deltas);
auto wal_files = GetFilesList();
size_t num_expected_wal_files = 1;
ASSERT_EQ(num_expected_wal_files, wal_files.size());
if (storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) {
DeltaGenerator gen_empty(storage_directory, true, 5, storage_mode);
ASSERT_EQ(gen.GetPosition(), gen_empty.GetPosition());
}
}
}
INSTANTIATE_TEST_CASE_P(ParameterizedWalStorageModeTests, StorageModeWalFileTest, ::testing::ValuesIn(storage_modes),
StorageModeWalFileTest::PrintStringParamToName());