Merge branch 'master' into MG-parallelize-recovery

This commit is contained in:
János Benjamin Antal 2023-04-12 17:25:45 +02:00
commit c2f3a92eca
46 changed files with 1437 additions and 289 deletions

View File

@ -2,7 +2,12 @@ name: Package All
# TODO(gitbuda): Cleanup docker container if GHA job was canceled. # TODO(gitbuda): Cleanup docker container if GHA job was canceled.
on: workflow_dispatch on:
workflow_dispatch:
inputs:
memgraph_version:
description: "Memgraph version to upload as. If empty upload is skipped. Format: 'vX.Y.Z'"
required: false
jobs: jobs:
centos-7: centos-7:
@ -106,7 +111,7 @@ jobs:
- name: "Upload package" - name: "Upload package"
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3
with: with:
name: ubuntu-1804 name: ubuntu-18.04
path: build/output/ubuntu-18.04/memgraph*.deb path: build/output/ubuntu-18.04/memgraph*.deb
ubuntu-2004: ubuntu-2004:
@ -123,7 +128,7 @@ jobs:
- name: "Upload package" - name: "Upload package"
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3
with: with:
name: ubuntu-2004 name: ubuntu-20.04
path: build/output/ubuntu-20.04/memgraph*.deb path: build/output/ubuntu-20.04/memgraph*.deb
ubuntu-2204: ubuntu-2204:
@ -140,7 +145,7 @@ jobs:
- name: "Upload package" - name: "Upload package"
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3
with: with:
name: ubuntu-2204 name: ubuntu-22.04
path: build/output/ubuntu-22.04/memgraph*.deb path: build/output/ubuntu-22.04/memgraph*.deb
debian-11-platform: debian-11-platform:
@ -208,7 +213,7 @@ jobs:
- name: "Upload package" - name: "Upload package"
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3
with: with:
name: debian-11-arm name: debian-11-aarch64
path: build/output/debian-11-arm/memgraph*.deb path: build/output/debian-11-arm/memgraph*.deb
ubuntu-2204-arm: ubuntu-2204-arm:
@ -225,5 +230,27 @@ jobs:
- name: "Upload package" - name: "Upload package"
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3
with: with:
name: ubuntu-22.04-arm name: ubuntu-22.04-aarch64
path: build/output/ubuntu-22.04-arm/memgraph*.deb path: build/output/ubuntu-22.04-arm/memgraph*.deb
upload-to-s3:
# only run upload if we specified version. Allows for runs without upload
if: "${{ github.event.inputs.memgraph_version != '' }}"
needs: [centos-7, centos-9, debian-10, debian-11, docker, ubuntu-1804, ubuntu-2004, ubuntu-2204, debian-11-platform, fedora-36, amzn-2, debian-11-arm, ubuntu-2204-arm]
runs-on: ubuntu-latest
steps:
- name: Download artifacts
uses: actions/download-artifact@v3
with:
# name: # if name input parameter is not provided, all artifacts are downloaded
# and put in directories named after each one.
path: build/output/release
- name: Upload to S3
uses: jakejarvis/s3-sync-action@v0.5.1
env:
AWS_S3_BUCKET: "download.memgraph.com"
AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: "eu-west-1"
SOURCE_DIR: "build/output/release"
DEST_DIR: "memgraph/${{ github.event.inputs.memgraph_version }}/"

View File

@ -31,8 +31,8 @@ jobs:
- name: Log in to Docker Hub - name: Log in to Docker Hub
uses: docker/login-action@v2 uses: docker/login-action@v2
with: with:
username: ${{ secrets.DOCKER_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Download memgraph binary - name: Download memgraph binary
run: | run: |

2
init
View File

@ -109,7 +109,7 @@ done;
# Install precommit hook except on old operating systems because we don't # Install precommit hook except on old operating systems because we don't
# develop on them -> pre-commit hook not required -> we can use latest # develop on them -> pre-commit hook not required -> we can use latest
# packages. # packages.
if [ "${DISTRO}" != "centos-7" ] && [ "$DISTRO" != "debian-10" ] && [ "${DISTRO}" != "ubuntu-18.04" ]; then if [ "${DISTRO}" != "centos-7" ] && [ "$DISTRO" != "debian-10" ] && [ "${DISTRO}" != "ubuntu-18.04" ] && [ "${DISTRO}" != "amzn-2" ]; then
python3 -m pip install pre-commit python3 -m pip install pre-commit
python3 -m pre_commit install python3 -m pre_commit install
# Install py format tools for usage during the development. # Install py format tools for usage during the development.

View File

@ -36,7 +36,7 @@ ADDITIONAL USE GRANT: You may use the Licensed Work in accordance with the
3. using the Licensed Work to create a work or solution 3. using the Licensed Work to create a work or solution
which competes (or might reasonably be expected to which competes (or might reasonably be expected to
compete) with the Licensed Work. compete) with the Licensed Work.
CHANGE DATE: 2027-08-03 CHANGE DATE: 2027-05-04
CHANGE LICENSE: Apache License, Version 2.0 CHANGE LICENSE: Apache License, Version 2.0
For information about alternative licensing arrangements, please visit: https://memgraph.com/legal. For information about alternative licensing arrangements, please visit: https://memgraph.com/legal.

View File

@ -16,6 +16,10 @@ target_link_libraries(mg-auth mg-utils mg-kvstore mg-license )
target_link_libraries(mg-auth ${Seccomp_LIBRARIES}) target_link_libraries(mg-auth ${Seccomp_LIBRARIES})
target_include_directories(mg-auth SYSTEM PRIVATE ${Seccomp_INCLUDE_DIRS}) target_include_directories(mg-auth SYSTEM PRIVATE ${Seccomp_INCLUDE_DIRS})
find_package(OpenSSL REQUIRED)
target_link_libraries(mg-auth ${OPENSSL_LIBRARIES})
target_include_directories(mg-auth SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR})
# Install reference auth modules and their configuration files. # Install reference auth modules and their configuration files.
install(PROGRAMS ${CMAKE_CURRENT_SOURCE_DIR}/reference_modules/example.py install(PROGRAMS ${CMAKE_CURRENT_SOURCE_DIR}/reference_modules/example.py
DESTINATION lib/memgraph/auth_module) DESTINATION lib/memgraph/auth_module)

View File

@ -1,19 +1,64 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Licensed as a Memgraph Enterprise file under the Memgraph Enterprise // Licensed as a Memgraph Enterprise file under the Memgraph Enterprise
// License (the "License"); by using this file, you agree to be bound by the terms of the License, and you may not use // License (the "License"); by using this file, you agree to be bound by the terms of the License, and you may not use
// this file except in compliance with the License. You may obtain a copy of the License at https://memgraph.com/legal. // this file except in compliance with the License. You may obtain a copy of the License at https://memgraph.com/legal.
// //
// //
#include "auth/crypto.hpp" #include "auth/crypto.hpp"
#include <iomanip>
#include <sstream>
#include <gflags/gflags.h>
#include <libbcrypt/bcrypt.h> #include <libbcrypt/bcrypt.h>
#include <openssl/evp.h>
#include <openssl/opensslv.h>
#include <openssl/sha.h>
#include "auth/exceptions.hpp" #include "auth/exceptions.hpp"
#include "utils/enum.hpp"
#include "utils/flag_validation.hpp"
namespace {
using namespace std::literals;
inline constexpr std::array password_encryption_mappings{
std::pair{"bcrypt"sv, memgraph::auth::PasswordEncryptionAlgorithm::BCRYPT},
std::pair{"sha256"sv, memgraph::auth::PasswordEncryptionAlgorithm::SHA256},
std::pair{"sha256-multiple"sv, memgraph::auth::PasswordEncryptionAlgorithm::SHA256_MULTIPLE}};
inline constexpr uint64_t ONE_SHA_ITERATION = 1;
inline constexpr uint64_t MULTIPLE_SHA_ITERATIONS = 1024;
} // namespace
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables,misc-unused-parameters)
DEFINE_VALIDATED_string(password_encryption_algorithm, "bcrypt",
"The password encryption algorithm used for authentication.", {
if (const auto result =
memgraph::utils::IsValidEnumValueString(value, password_encryption_mappings);
result.HasError()) {
const auto error = result.GetError();
switch (error) {
case memgraph::utils::ValidationError::EmptyValue: {
std::cout << "Password encryption algorithm cannot be empty." << std::endl;
break;
}
case memgraph::utils::ValidationError::InvalidValue: {
std::cout << "Invalid value for password encryption algorithm. Allowed values: "
<< memgraph::utils::GetAllowedEnumValuesString(password_encryption_mappings)
<< std::endl;
break;
}
}
return false;
}
return true;
});
namespace memgraph::auth { namespace memgraph::auth {
const std::string EncryptPassword(const std::string &password) { namespace BCrypt {
std::string EncryptPassword(const std::string &password) {
char salt[BCRYPT_HASHSIZE]; char salt[BCRYPT_HASHSIZE];
char hash[BCRYPT_HASHSIZE]; char hash[BCRYPT_HASHSIZE];
@ -28,7 +73,7 @@ const std::string EncryptPassword(const std::string &password) {
throw AuthException("Couldn't hash password!"); throw AuthException("Couldn't hash password!");
} }
return std::string(hash); return {hash};
} }
bool VerifyPassword(const std::string &password, const std::string &hash) { bool VerifyPassword(const std::string &password, const std::string &hash) {
@ -38,5 +83,102 @@ bool VerifyPassword(const std::string &password, const std::string &hash) {
} }
return ret == 0; return ret == 0;
} }
} // namespace BCrypt
namespace SHA {
#if OPENSSL_VERSION_MAJOR >= 3
std::string EncryptPasswordOpenSSL3(const std::string &password, const uint64_t number_of_iterations) {
unsigned char hash[SHA256_DIGEST_LENGTH];
EVP_MD_CTX *ctx = EVP_MD_CTX_new();
EVP_MD *md = EVP_MD_fetch(nullptr, "SHA2-256", nullptr);
EVP_DigestInit_ex(ctx, md, nullptr);
for (auto i = 0; i < number_of_iterations; i++) {
EVP_DigestUpdate(ctx, password.c_str(), password.size());
}
EVP_DigestFinal_ex(ctx, hash, nullptr);
EVP_MD_free(md);
EVP_MD_CTX_free(ctx);
std::stringstream result_stream;
for (auto hash_char : hash) {
result_stream << std::hex << std::setw(2) << std::setfill('0') << (int)hash_char;
}
return result_stream.str();
}
#else
std::string EncryptPasswordOpenSSL1_1(const std::string &password, const uint64_t number_of_iterations) {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256_CTX sha256;
SHA256_Init(&sha256);
for (auto i = 0; i < number_of_iterations; i++) {
SHA256_Update(&sha256, password.c_str(), password.size());
}
SHA256_Final(hash, &sha256);
std::stringstream ss;
for (auto hash_char : hash) {
ss << std::hex << std::setw(2) << std::setfill('0') << (int)hash_char;
}
return ss.str();
}
#endif
std::string EncryptPassword(const std::string &password, const uint64_t number_of_iterations) {
#if OPENSSL_VERSION_MAJOR >= 3
return EncryptPasswordOpenSSL3(password, number_of_iterations);
#else
return EncryptPasswordOpenSSL1_1(password, number_of_iterations);
#endif
}
bool VerifyPassword(const std::string &password, const std::string &hash, const uint64_t number_of_iterations) {
auto password_hash = EncryptPassword(password, number_of_iterations);
return password_hash == hash;
}
} // namespace SHA
bool VerifyPassword(const std::string &password, const std::string &hash) {
const auto password_encryption_algorithm = utils::StringToEnum<PasswordEncryptionAlgorithm>(
FLAGS_password_encryption_algorithm, password_encryption_mappings);
if (!password_encryption_algorithm.has_value()) {
throw AuthException("Invalid password encryption flag '{}'!", FLAGS_password_encryption_algorithm);
}
switch (password_encryption_algorithm.value()) {
case PasswordEncryptionAlgorithm::BCRYPT:
return BCrypt::VerifyPassword(password, hash);
case PasswordEncryptionAlgorithm::SHA256:
return SHA::VerifyPassword(password, hash, ONE_SHA_ITERATION);
case PasswordEncryptionAlgorithm::SHA256_MULTIPLE:
return SHA::VerifyPassword(password, hash, MULTIPLE_SHA_ITERATIONS);
}
throw AuthException("Invalid password encryption flag '{}'!", FLAGS_password_encryption_algorithm);
}
std::string EncryptPassword(const std::string &password) {
const auto password_encryption_algorithm = utils::StringToEnum<PasswordEncryptionAlgorithm>(
FLAGS_password_encryption_algorithm, password_encryption_mappings);
if (!password_encryption_algorithm.has_value()) {
throw AuthException("Invalid password encryption flag '{}'!", FLAGS_password_encryption_algorithm);
}
switch (password_encryption_algorithm.value()) {
case PasswordEncryptionAlgorithm::BCRYPT:
return BCrypt::EncryptPassword(password);
case PasswordEncryptionAlgorithm::SHA256:
return SHA::EncryptPassword(password, ONE_SHA_ITERATION);
case PasswordEncryptionAlgorithm::SHA256_MULTIPLE:
return SHA::EncryptPassword(password, MULTIPLE_SHA_ITERATIONS);
}
}
} // namespace memgraph::auth } // namespace memgraph::auth

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Licensed as a Memgraph Enterprise file under the Memgraph Enterprise // Licensed as a Memgraph Enterprise file under the Memgraph Enterprise
// License (the "License"); by using this file, you agree to be bound by the terms of the License, and you may not use // License (the "License"); by using this file, you agree to be bound by the terms of the License, and you may not use
@ -11,10 +11,11 @@
#include <string> #include <string>
namespace memgraph::auth { namespace memgraph::auth {
enum class PasswordEncryptionAlgorithm : uint8_t { BCRYPT, SHA256, SHA256_MULTIPLE };
/// @throw AuthException if unable to encrypt the password. /// @throw AuthException if unable to encrypt the password.
const std::string EncryptPassword(const std::string &password); std::string EncryptPassword(const std::string &password);
/// @throw AuthException if unable to verify the password. /// @throw AuthException if unable to verify the password.
bool VerifyPassword(const std::string &password, const std::string &hash); bool VerifyPassword(const std::string &password, const std::string &hash);
} // namespace memgraph::auth } // namespace memgraph::auth

View File

@ -45,7 +45,9 @@ const std::vector<Permission> kPermissionsAll = {Permission::MATCH, Permis
Permission::FREE_MEMORY, Permission::TRIGGER, Permission::FREE_MEMORY, Permission::TRIGGER,
Permission::CONFIG, Permission::STREAM, Permission::CONFIG, Permission::STREAM,
Permission::MODULE_READ, Permission::MODULE_WRITE, Permission::MODULE_READ, Permission::MODULE_WRITE,
Permission::WEBSOCKET, Permission::TRANSACTION_MANAGEMENT}; Permission::WEBSOCKET, Permission::TRANSACTION_MANAGEMENT,
Permission::STORAGE_MODE};
} // namespace } // namespace
std::string PermissionToString(Permission permission) { std::string PermissionToString(Permission permission) {
@ -94,6 +96,8 @@ std::string PermissionToString(Permission permission) {
return "WEBSOCKET"; return "WEBSOCKET";
case Permission::TRANSACTION_MANAGEMENT: case Permission::TRANSACTION_MANAGEMENT:
return "TRANSACTION_MANAGEMENT"; return "TRANSACTION_MANAGEMENT";
case Permission::STORAGE_MODE:
return "STORAGE_MODE";
} }
} }

View File

@ -40,7 +40,8 @@ enum class Permission : uint64_t {
MODULE_READ = 1U << 18U, MODULE_READ = 1U << 18U,
MODULE_WRITE = 1U << 19U, MODULE_WRITE = 1U << 19U,
WEBSOCKET = 1U << 20U, WEBSOCKET = 1U << 20U,
TRANSACTION_MANAGEMENT = 1U << 21U TRANSACTION_MANAGEMENT = 1U << 21U,
STORAGE_MODE = 1U << 22U
}; };
// clang-format on // clang-format on

View File

@ -58,6 +58,8 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) {
return auth::Permission::MODULE_WRITE; return auth::Permission::MODULE_WRITE;
case query::AuthQuery::Privilege::WEBSOCKET: case query::AuthQuery::Privilege::WEBSOCKET:
return auth::Permission::WEBSOCKET; return auth::Permission::WEBSOCKET;
case query::AuthQuery::Privilege::STORAGE_MODE:
return auth::Permission::STORAGE_MODE;
case query::AuthQuery::Privilege::TRANSACTION_MANAGEMENT: case query::AuthQuery::Privilege::TRANSACTION_MANAGEMENT:
return auth::Permission::TRANSACTION_MANAGEMENT; return auth::Permission::TRANSACTION_MANAGEMENT;
} }

View File

@ -57,6 +57,7 @@
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/view.hpp" #include "storage/v2/view.hpp"
#include "telemetry/telemetry.hpp" #include "telemetry/telemetry.hpp"
#include "utils/enum.hpp"
#include "utils/event_counter.hpp" #include "utils/event_counter.hpp"
#include "utils/file.hpp" #include "utils/file.hpp"
#include "utils/flag_validation.hpp" #include "utils/flag_validation.hpp"
@ -103,42 +104,6 @@ constexpr const char *kMgUser = "MEMGRAPH_USER";
constexpr const char *kMgPassword = "MEMGRAPH_PASSWORD"; constexpr const char *kMgPassword = "MEMGRAPH_PASSWORD";
constexpr const char *kMgPassfile = "MEMGRAPH_PASSFILE"; constexpr const char *kMgPassfile = "MEMGRAPH_PASSFILE";
namespace {
std::string GetAllowedEnumValuesString(const auto &mappings) {
std::vector<std::string> allowed_values;
allowed_values.reserve(mappings.size());
std::transform(mappings.begin(), mappings.end(), std::back_inserter(allowed_values),
[](const auto &mapping) { return std::string(mapping.first); });
return memgraph::utils::Join(allowed_values, ", ");
}
enum class ValidationError : uint8_t { EmptyValue, InvalidValue };
memgraph::utils::BasicResult<ValidationError> IsValidEnumValueString(const auto &value, const auto &mappings) {
if (value.empty()) {
return ValidationError::EmptyValue;
}
if (std::find_if(mappings.begin(), mappings.end(), [&](const auto &mapping) { return mapping.first == value; }) ==
mappings.cend()) {
return ValidationError::InvalidValue;
}
return {};
}
template <typename Enum>
std::optional<Enum> StringToEnum(const auto &value, const auto &mappings) {
const auto mapping_iter =
std::find_if(mappings.begin(), mappings.end(), [&](const auto &mapping) { return mapping.first == value; });
if (mapping_iter == mappings.cend()) {
return std::nullopt;
}
return mapping_iter->second;
}
} // namespace
// Short help flag. // Short help flag.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_HIDDEN_bool(h, false, "Print usage and exit."); DEFINE_HIDDEN_bool(h, false, "Print usage and exit.");
@ -300,21 +265,21 @@ inline constexpr std::array isolation_level_mappings{
const std::string isolation_level_help_string = const std::string isolation_level_help_string =
fmt::format("Default isolation level used for the transactions. Allowed values: {}", fmt::format("Default isolation level used for the transactions. Allowed values: {}",
GetAllowedEnumValuesString(isolation_level_mappings)); memgraph::utils::GetAllowedEnumValuesString(isolation_level_mappings));
} // namespace } // namespace
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_string(isolation_level, "SNAPSHOT_ISOLATION", isolation_level_help_string.c_str(), { DEFINE_VALIDATED_string(isolation_level, "SNAPSHOT_ISOLATION", isolation_level_help_string.c_str(), {
if (const auto result = IsValidEnumValueString(value, isolation_level_mappings); result.HasError()) { if (const auto result = memgraph::utils::IsValidEnumValueString(value, isolation_level_mappings); result.HasError()) {
const auto error = result.GetError(); const auto error = result.GetError();
switch (error) { switch (error) {
case ValidationError::EmptyValue: { case memgraph::utils::ValidationError::EmptyValue: {
std::cout << "Isolation level cannot be empty." << std::endl; std::cout << "Isolation level cannot be empty." << std::endl;
break; break;
} }
case ValidationError::InvalidValue: { case memgraph::utils::ValidationError::InvalidValue: {
std::cout << "Invalid value for isolation level. Allowed values: " std::cout << "Invalid value for isolation level. Allowed values: "
<< GetAllowedEnumValuesString(isolation_level_mappings) << std::endl; << memgraph::utils::GetAllowedEnumValuesString(isolation_level_mappings) << std::endl;
break; break;
} }
} }
@ -327,7 +292,7 @@ DEFINE_VALIDATED_string(isolation_level, "SNAPSHOT_ISOLATION", isolation_level_h
namespace { namespace {
memgraph::storage::IsolationLevel ParseIsolationLevel() { memgraph::storage::IsolationLevel ParseIsolationLevel() {
const auto isolation_level = const auto isolation_level =
StringToEnum<memgraph::storage::IsolationLevel>(FLAGS_isolation_level, isolation_level_mappings); memgraph::utils::StringToEnum<memgraph::storage::IsolationLevel>(FLAGS_isolation_level, isolation_level_mappings);
MG_ASSERT(isolation_level, "Invalid isolation level"); MG_ASSERT(isolation_level, "Invalid isolation level");
return *isolation_level; return *isolation_level;
} }
@ -387,21 +352,21 @@ inline constexpr std::array log_level_mappings{
std::pair{"INFO"sv, spdlog::level::info}, std::pair{"WARNING"sv, spdlog::level::warn}, std::pair{"INFO"sv, spdlog::level::info}, std::pair{"WARNING"sv, spdlog::level::warn},
std::pair{"ERROR"sv, spdlog::level::err}, std::pair{"CRITICAL"sv, spdlog::level::critical}}; std::pair{"ERROR"sv, spdlog::level::err}, std::pair{"CRITICAL"sv, spdlog::level::critical}};
const std::string log_level_help_string = const std::string log_level_help_string = fmt::format("Minimum log level. Allowed values: {}",
fmt::format("Minimum log level. Allowed values: {}", GetAllowedEnumValuesString(log_level_mappings)); memgraph::utils::GetAllowedEnumValuesString(log_level_mappings));
} // namespace } // namespace
DEFINE_VALIDATED_string(log_level, "WARNING", log_level_help_string.c_str(), { DEFINE_VALIDATED_string(log_level, "WARNING", log_level_help_string.c_str(), {
if (const auto result = IsValidEnumValueString(value, log_level_mappings); result.HasError()) { if (const auto result = memgraph::utils::IsValidEnumValueString(value, log_level_mappings); result.HasError()) {
const auto error = result.GetError(); const auto error = result.GetError();
switch (error) { switch (error) {
case ValidationError::EmptyValue: { case memgraph::utils::ValidationError::EmptyValue: {
std::cout << "Log level cannot be empty." << std::endl; std::cout << "Log level cannot be empty." << std::endl;
break; break;
} }
case ValidationError::InvalidValue: { case memgraph::utils::ValidationError::InvalidValue: {
std::cout << "Invalid value for log level. Allowed values: " << GetAllowedEnumValuesString(log_level_mappings) std::cout << "Invalid value for log level. Allowed values: "
<< std::endl; << memgraph::utils::GetAllowedEnumValuesString(log_level_mappings) << std::endl;
break; break;
} }
} }
@ -413,7 +378,7 @@ DEFINE_VALIDATED_string(log_level, "WARNING", log_level_help_string.c_str(), {
namespace { namespace {
spdlog::level::level_enum ParseLogLevel() { spdlog::level::level_enum ParseLogLevel() {
const auto log_level = StringToEnum<spdlog::level::level_enum>(FLAGS_log_level, log_level_mappings); const auto log_level = memgraph::utils::StringToEnum<spdlog::level::level_enum>(FLAGS_log_level, log_level_mappings);
MG_ASSERT(log_level, "Invalid log level"); MG_ASSERT(log_level, "Invalid log level");
return *log_level; return *log_level;
} }

View File

@ -211,6 +211,21 @@ class IsolationLevelModificationInMulticommandTxException : public QueryExceptio
: QueryException("Isolation level cannot be modified in multicommand transactions.") {} : QueryException("Isolation level cannot be modified in multicommand transactions.") {}
}; };
class IsolationLevelModificationInAnalyticsException : public QueryException {
public:
IsolationLevelModificationInAnalyticsException()
: QueryException(
"Isolation level cannot be modified when storage mode is set to IN_MEMORY_ANALYTICAL."
"IN_MEMORY_ANALYTICAL mode doesn't provide any isolation guarantees, "
"you can think about it as an equivalent to READ_UNCOMMITED.") {}
};
class StorageModeModificationInMulticommandTxException : public QueryException {
public:
StorageModeModificationInMulticommandTxException()
: QueryException("Storage mode cannot be modified in multicommand transactions.") {}
};
class CreateSnapshotInMulticommandTxException final : public QueryException { class CreateSnapshotInMulticommandTxException final : public QueryException {
public: public:
CreateSnapshotInMulticommandTxException() CreateSnapshotInMulticommandTxException()

View File

@ -243,6 +243,9 @@ constexpr utils::TypeInfo query::TriggerQuery::kType{utils::TypeId::AST_TRIGGER_
constexpr utils::TypeInfo query::IsolationLevelQuery::kType{utils::TypeId::AST_ISOLATION_LEVEL_QUERY, constexpr utils::TypeInfo query::IsolationLevelQuery::kType{utils::TypeId::AST_ISOLATION_LEVEL_QUERY,
"IsolationLevelQuery", &query::Query::kType}; "IsolationLevelQuery", &query::Query::kType};
constexpr utils::TypeInfo query::StorageModeQuery::kType{utils::TypeId::AST_STORAGE_MODE_QUERY, "StorageModeQuery",
&query::Query::kType};
constexpr utils::TypeInfo query::CreateSnapshotQuery::kType{utils::TypeId::AST_CREATE_SNAPSHOT_QUERY, constexpr utils::TypeInfo query::CreateSnapshotQuery::kType{utils::TypeId::AST_CREATE_SNAPSHOT_QUERY,
"CreateSnapshotQuery", &query::Query::kType}; "CreateSnapshotQuery", &query::Query::kType};

View File

@ -2714,6 +2714,7 @@ class AuthQuery : public memgraph::query::Query {
MODULE_READ, MODULE_READ,
MODULE_WRITE, MODULE_WRITE,
WEBSOCKET, WEBSOCKET,
STORAGE_MODE,
TRANSACTION_MANAGEMENT TRANSACTION_MANAGEMENT
}; };
@ -2777,7 +2778,8 @@ const std::vector<AuthQuery::Privilege> kPrivilegesAll = {
AuthQuery::Privilege::FREE_MEMORY, AuthQuery::Privilege::TRIGGER, AuthQuery::Privilege::FREE_MEMORY, AuthQuery::Privilege::TRIGGER,
AuthQuery::Privilege::CONFIG, AuthQuery::Privilege::STREAM, AuthQuery::Privilege::CONFIG, AuthQuery::Privilege::STREAM,
AuthQuery::Privilege::MODULE_READ, AuthQuery::Privilege::MODULE_WRITE, AuthQuery::Privilege::MODULE_READ, AuthQuery::Privilege::MODULE_WRITE,
AuthQuery::Privilege::WEBSOCKET, AuthQuery::Privilege::TRANSACTION_MANAGEMENT}; AuthQuery::Privilege::WEBSOCKET, AuthQuery::Privilege::TRANSACTION_MANAGEMENT,
AuthQuery::Privilege::STORAGE_MODE};
class InfoQuery : public memgraph::query::Query { class InfoQuery : public memgraph::query::Query {
public: public:
@ -3046,6 +3048,29 @@ class IsolationLevelQuery : public memgraph::query::Query {
friend class AstStorage; friend class AstStorage;
}; };
class StorageModeQuery : public memgraph::query::Query {
public:
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class StorageMode { IN_MEMORY_TRANSACTIONAL, IN_MEMORY_ANALYTICAL };
StorageModeQuery() = default;
DEFVISITABLE(QueryVisitor<void>);
memgraph::query::StorageModeQuery::StorageMode storage_mode_;
StorageModeQuery *Clone(AstStorage *storage) const override {
StorageModeQuery *object = storage->Create<StorageModeQuery>();
object->storage_mode_ = storage_mode_;
return object;
}
private:
friend class AstStorage;
};
class CreateSnapshotQuery : public memgraph::query::Query { class CreateSnapshotQuery : public memgraph::query::Query {
public: public:
static const utils::TypeInfo kType; static const utils::TypeInfo kType;

View File

@ -89,6 +89,7 @@ class LoadCsv;
class FreeMemoryQuery; class FreeMemoryQuery;
class TriggerQuery; class TriggerQuery;
class IsolationLevelQuery; class IsolationLevelQuery;
class StorageModeQuery;
class CreateSnapshotQuery; class CreateSnapshotQuery;
class StreamQuery; class StreamQuery;
class SettingQuery; class SettingQuery;
@ -134,6 +135,6 @@ class QueryVisitor
: public utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery, InfoQuery, : public utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery, InfoQuery,
ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, FreeMemoryQuery, TriggerQuery, ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, FreeMemoryQuery, TriggerQuery,
IsolationLevelQuery, CreateSnapshotQuery, StreamQuery, SettingQuery, VersionQuery, IsolationLevelQuery, CreateSnapshotQuery, StreamQuery, SettingQuery, VersionQuery,
ShowConfigQuery, TransactionQueueQuery, AnalyzeGraphQuery> {}; ShowConfigQuery, TransactionQueueQuery, StorageModeQuery, AnalyzeGraphQuery> {};
} // namespace memgraph::query } // namespace memgraph::query

View File

@ -487,6 +487,20 @@ antlrcpp::Any CypherMainVisitor::visitIsolationLevelQuery(MemgraphCypher::Isolat
return isolation_level_query; return isolation_level_query;
} }
antlrcpp::Any CypherMainVisitor::visitStorageModeQuery(MemgraphCypher::StorageModeQueryContext *ctx) {
auto *storage_mode_query = storage_->Create<StorageModeQuery>();
storage_mode_query->storage_mode_ = std::invoke([mode = ctx->storageMode()]() {
if (mode->IN_MEMORY_ANALYTICAL()) {
return StorageModeQuery::StorageMode::IN_MEMORY_ANALYTICAL;
}
return StorageModeQuery::StorageMode::IN_MEMORY_TRANSACTIONAL;
});
query_ = storage_mode_query;
return storage_mode_query;
}
antlrcpp::Any CypherMainVisitor::visitCreateSnapshotQuery(MemgraphCypher::CreateSnapshotQueryContext *ctx) { antlrcpp::Any CypherMainVisitor::visitCreateSnapshotQuery(MemgraphCypher::CreateSnapshotQueryContext *ctx) {
query_ = storage_->Create<CreateSnapshotQuery>(); query_ = storage_->Create<CreateSnapshotQuery>();
return query_; return query_;
@ -1511,6 +1525,7 @@ antlrcpp::Any CypherMainVisitor::visitPrivilege(MemgraphCypher::PrivilegeContext
if (ctx->MODULE_WRITE()) return AuthQuery::Privilege::MODULE_WRITE; if (ctx->MODULE_WRITE()) return AuthQuery::Privilege::MODULE_WRITE;
if (ctx->WEBSOCKET()) return AuthQuery::Privilege::WEBSOCKET; if (ctx->WEBSOCKET()) return AuthQuery::Privilege::WEBSOCKET;
if (ctx->TRANSACTION_MANAGEMENT()) return AuthQuery::Privilege::TRANSACTION_MANAGEMENT; if (ctx->TRANSACTION_MANAGEMENT()) return AuthQuery::Privilege::TRANSACTION_MANAGEMENT;
if (ctx->STORAGE_MODE()) return AuthQuery::Privilege::STORAGE_MODE;
LOG_FATAL("Should not get here - unknown privilege!"); LOG_FATAL("Should not get here - unknown privilege!");
} }

View File

@ -263,6 +263,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/ */
antlrcpp::Any visitIsolationLevelQuery(MemgraphCypher::IsolationLevelQueryContext *ctx) override; antlrcpp::Any visitIsolationLevelQuery(MemgraphCypher::IsolationLevelQueryContext *ctx) override;
/**
* @return StorageModeQuery*
*/
antlrcpp::Any visitStorageModeQuery(MemgraphCypher::StorageModeQueryContext *ctx) override;
/** /**
* @return CreateSnapshotQuery* * @return CreateSnapshotQuery*
*/ */

View File

@ -59,6 +59,8 @@ memgraphCypherKeyword : cypherKeyword
| HEADER | HEADER
| IDENTIFIED | IDENTIFIED
| ISOLATION | ISOLATION
| IN_MEMORY_ANALYTICAL
| IN_MEMORY_TRANSACTIONAL
| KAFKA | KAFKA
| LABELS | LABELS
| LEVEL | LEVEL
@ -88,6 +90,7 @@ memgraphCypherKeyword : cypherKeyword
| SNAPSHOT | SNAPSHOT
| START | START
| STATS | STATS
| STORAGE
| STREAM | STREAM
| STREAMS | STREAMS
| SYNC | SYNC
@ -127,6 +130,7 @@ query : cypherQuery
| freeMemoryQuery | freeMemoryQuery
| triggerQuery | triggerQuery
| isolationLevelQuery | isolationLevelQuery
| storageModeQuery
| createSnapshotQuery | createSnapshotQuery
| streamQuery | streamQuery
| settingQuery | settingQuery
@ -277,6 +281,7 @@ privilege : CREATE
| MODULE_WRITE | MODULE_WRITE
| WEBSOCKET | WEBSOCKET
| TRANSACTION_MANAGEMENT | TRANSACTION_MANAGEMENT
| STORAGE_MODE
; ;
granularPrivilege : NOTHING | READ | UPDATE | CREATE_DELETE ; granularPrivilege : NOTHING | READ | UPDATE | CREATE_DELETE ;
@ -354,6 +359,10 @@ isolationLevelScope : GLOBAL | SESSION | NEXT ;
isolationLevelQuery : SET isolationLevelScope TRANSACTION ISOLATION LEVEL isolationLevel ; isolationLevelQuery : SET isolationLevelScope TRANSACTION ISOLATION LEVEL isolationLevel ;
storageMode : IN_MEMORY_ANALYTICAL | IN_MEMORY_TRANSACTIONAL ;
storageModeQuery : STORAGE MODE storageMode ;
createSnapshotQuery : CREATE SNAPSHOT ; createSnapshotQuery : CREATE SNAPSHOT ;
streamName : symbolicName ; streamName : symbolicName ;

View File

@ -25,103 +25,107 @@ import CypherLexer ;
UNDERSCORE : '_' ; UNDERSCORE : '_' ;
AFTER : A F T E R ; AFTER : A F T E R ;
ALTER : A L T E R ; ALTER : A L T E R ;
ANALYZE : A N A L Y Z E ; ANALYZE : A N A L Y Z E ;
ASYNC : A S Y N C ; ASYNC : A S Y N C ;
AUTH : A U T H ; AUTH : A U T H ;
BAD : B A D ; BAD : B A D ;
BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ; BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ;
BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ; BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ;
BATCH_SIZE : B A T C H UNDERSCORE S I Z E ; BATCH_SIZE : B A T C H UNDERSCORE S I Z E ;
BEFORE : B E F O R E ; BEFORE : B E F O R E ;
BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ; BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ;
CALL : C A L L ; CALL : C A L L ;
CHECK : C H E C K ; CHECK : C H E C K ;
CLEAR : C L E A R ; CLEAR : C L E A R ;
COMMIT : C O M M I T ; COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ; COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ; CONFIG : C O N F I G ;
CONFIGS : C O N F I G S; CONFIGS : C O N F I G S;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ; CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CREATE_DELETE : C R E A T E UNDERSCORE D E L E T E ; CREATE_DELETE : C R E A T E UNDERSCORE D E L E T E ;
CREDENTIALS : C R E D E N T I A L S ; CREDENTIALS : C R E D E N T I A L S ;
CSV : C S V ; CSV : C S V ;
DATA : D A T A ; DATA : D A T A ;
DELIMITER : D E L I M I T E R ; DELIMITER : D E L I M I T E R ;
DATABASE : D A T A B A S E ; DATABASE : D A T A B A S E ;
DENY : D E N Y ; DENY : D E N Y ;
DIRECTORY : D I R E C T O R Y ; DIRECTORY : D I R E C T O R Y ;
DROP : D R O P ; DROP : D R O P ;
DUMP : D U M P ; DUMP : D U M P ;
DURABILITY : D U R A B I L I T Y ; DURABILITY : D U R A B I L I T Y ;
EDGE_TYPES : E D G E UNDERSCORE T Y P E S ; EDGE_TYPES : E D G E UNDERSCORE T Y P E S ;
EXECUTE : E X E C U T E ; EXECUTE : E X E C U T E ;
FOR : F O R ; FOR : F O R ;
FOREACH : F O R E A C H; FOREACH : F O R E A C H;
FREE : F R E E ; FREE : F R E E ;
FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ; FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ;
FROM : F R O M ; FROM : F R O M ;
GLOBAL : G L O B A L ; GLOBAL : G L O B A L ;
GRANT : G R A N T ; GRANT : G R A N T ;
GRAPH : G R A P H ; GRAPH : G R A P H ;
GRANTS : G R A N T S ; GRANTS : G R A N T S ;
HEADER : H E A D E R ; HEADER : H E A D E R ;
IDENTIFIED : I D E N T I F I E D ; IDENTIFIED : I D E N T I F I E D ;
IGNORE : I G N O R E ; IGNORE : I G N O R E ;
ISOLATION : I S O L A T I O N ; ISOLATION : I S O L A T I O N ;
KAFKA : K A F K A ; IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ;
LABELS : L A B E L S ; IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ;
LEVEL : L E V E L ; KAFKA : K A F K A ;
LOAD : L O A D ; LABELS : L A B E L S ;
LOCK : L O C K ; LEVEL : L E V E L ;
MAIN : M A I N ; LOAD : L O A D ;
MODE : M O D E ; LOCK : L O C K ;
MODULE_READ : M O D U L E UNDERSCORE R E A D ; MAIN : M A I N ;
MODULE_WRITE : M O D U L E UNDERSCORE W R I T E ; MODE : M O D E ;
NEXT : N E X T ; MODULE_READ : M O D U L E UNDERSCORE R E A D ;
NO : N O ; MODULE_WRITE : M O D U L E UNDERSCORE W R I T E ;
NOTHING : N O T H I N G ; NEXT : N E X T ;
PASSWORD : P A S S W O R D ; NO : N O ;
PORT : P O R T ; NOTHING : N O T H I N G ;
PRIVILEGES : P R I V I L E G E S ; PASSWORD : P A S S W O R D ;
PULSAR : P U L S A R ; PORT : P O R T ;
READ : R E A D ; PRIVILEGES : P R I V I L E G E S ;
READ_FILE : R E A D UNDERSCORE F I L E ; PULSAR : P U L S A R ;
REGISTER : R E G I S T E R ; READ : R E A D ;
REPLICA : R E P L I C A ; READ_FILE : R E A D UNDERSCORE F I L E ;
REPLICAS : R E P L I C A S ; REGISTER : R E G I S T E R ;
REPLICATION : R E P L I C A T I O N ; REPLICA : R E P L I C A ;
REVOKE : R E V O K E ; REPLICAS : R E P L I C A S ;
ROLE : R O L E ; REPLICATION : R E P L I C A T I O N ;
ROLES : R O L E S ; REVOKE : R E V O K E ;
QUOTE : Q U O T E ; ROLE : R O L E ;
SERVICE_URL : S E R V I C E UNDERSCORE U R L ; ROLES : R O L E S ;
SESSION : S E S S I O N ; QUOTE : Q U O T E ;
SETTING : S E T T I N G ; SERVICE_URL : S E R V I C E UNDERSCORE U R L ;
SETTINGS : S E T T I N G S ; SESSION : S E S S I O N ;
SNAPSHOT : S N A P S H O T ; SETTING : S E T T I N G ;
START : S T A R T ; SETTINGS : S E T T I N G S ;
STATISTICS : S T A T I S T I C S ; SNAPSHOT : S N A P S H O T ;
STATS : S T A T S ; START : S T A R T ;
STOP : S T O P ; STATISTICS : S T A T I S T I C S ;
STREAM : S T R E A M ; STATS : S T A T S ;
STREAMS : S T R E A M S ; STOP : S T O P ;
SYNC : S Y N C ; STORAGE : S T O R A G E;
TERMINATE : T E R M I N A T E ; STORAGE_MODE : S T O R A G E UNDERSCORE MODE;
TIMEOUT : T I M E O U T ; STREAM : S T R E A M ;
TO : T O ; STREAMS : S T R E A M S ;
TOPICS : T O P I C S; SYNC : S Y N C ;
TRANSACTION : T R A N S A C T I O N ; TERMINATE : T E R M I N A T E ;
TRANSACTION_MANAGEMENT : T R A N S A C T I O N UNDERSCORE M A N A G E M E N T ; TIMEOUT : T I M E O U T ;
TRANSACTIONS : T R A N S A C T I O N S ; TO : T O ;
TRANSFORM : T R A N S F O R M ; TOPICS : T O P I C S;
TRIGGER : T R I G G E R ; TRANSACTION : T R A N S A C T I O N ;
TRIGGERS : T R I G G E R S ; TRANSACTION_MANAGEMENT : T R A N S A C T I O N UNDERSCORE M A N A G E M E N T ;
UNCOMMITTED : U N C O M M I T T E D ; TRANSACTIONS : T R A N S A C T I O N S ;
UNLOCK : U N L O C K ; TRANSFORM : T R A N S F O R M ;
UPDATE : U P D A T E ; TRIGGER : T R I G G E R ;
USER : U S E R ; TRIGGERS : T R I G G E R S ;
USERS : U S E R S ; UNCOMMITTED : U N C O M M I T T E D ;
VERSION : V E R S I O N ; UNLOCK : U N L O C K ;
WEBSOCKET : W E B S O C K E T ; UPDATE : U P D A T E ;
USER : U S E R ;
USERS : U S E R S ;
VERSION : V E R S I O N ;
WEBSOCKET : W E B S O C K E T ;

View File

@ -78,6 +78,8 @@ class PrivilegeExtractor : public QueryVisitor<void>, public HierarchicalTreeVis
void Visit(IsolationLevelQuery &isolation_level_query) override { AddPrivilege(AuthQuery::Privilege::CONFIG); } void Visit(IsolationLevelQuery &isolation_level_query) override { AddPrivilege(AuthQuery::Privilege::CONFIG); }
void Visit(StorageModeQuery & /*storage_mode_query*/) override { AddPrivilege(AuthQuery::Privilege::STORAGE_MODE); }
void Visit(CreateSnapshotQuery &create_snapshot_query) override { AddPrivilege(AuthQuery::Privilege::DURABILITY); } void Visit(CreateSnapshotQuery &create_snapshot_query) override { AddPrivilege(AuthQuery::Privilege::DURABILITY); }
void Visit(SettingQuery & /*setting_query*/) override { AddPrivilege(AuthQuery::Privilege::CONFIG); } void Visit(SettingQuery & /*setting_query*/) override { AddPrivilege(AuthQuery::Privilege::CONFIG); }

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -208,7 +208,10 @@ const trie::Trie kKeywords = {"union",
"websocket", "websocket",
"foreach", "foreach",
"labels", "labels",
"edge_types"}; "edge_types",
"off",
"in_memory_transactional",
"in_memory_analytical"};
// Unicode codepoints that are allowed at the start of the unescaped name. // Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts( const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -20,6 +20,7 @@
#include <functional> #include <functional>
#include <iterator> #include <iterator>
#include <limits> #include <limits>
#include <memory>
#include <optional> #include <optional>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
@ -67,6 +68,7 @@
#include "utils/settings.hpp" #include "utils/settings.hpp"
#include "utils/string.hpp" #include "utils/string.hpp"
#include "utils/tsc.hpp" #include "utils/tsc.hpp"
#include "utils/typeinfo.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
namespace EventCounter { namespace EventCounter {
@ -981,7 +983,8 @@ struct PullPlan {
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status, std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
TriggerContextCollector *trigger_context_collector = nullptr, TriggerContextCollector *trigger_context_collector = nullptr,
std::optional<size_t> memory_limit = {}); std::optional<size_t> memory_limit = {}, bool use_monotonic_memory = true);
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n, std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
const std::vector<Symbol> &output_symbols, const std::vector<Symbol> &output_symbols,
std::map<std::string, TypedValue> *summary); std::map<std::string, TypedValue> *summary);
@ -1005,16 +1008,25 @@ struct PullPlan {
// we have to keep track of any unsent results from previous `PullPlan::Pull` // we have to keep track of any unsent results from previous `PullPlan::Pull`
// manually by using this flag. // manually by using this flag.
bool has_unsent_results_ = false; bool has_unsent_results_ = false;
// In the case of LOAD CSV, we want to use only PoolResource without MonotonicMemoryResource
// to reuse allocated memory. As LOAD CSV is processing row by row
// it is possible to reduce memory usage significantly if MemoryResource deals with memory allocation
// can reuse memory that was allocated on processing the first row on all subsequent rows.
// This flag signals to `PullPlan::Pull` which MemoryResource to use
bool use_monotonic_memory_;
}; };
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query, PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status, std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit) TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit,
bool use_monotonic_memory)
: plan_(plan), : plan_(plan),
cursor_(plan->plan().MakeCursor(execution_memory)), cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory), frame_(plan->symbol_table().max_position(), execution_memory),
memory_limit_(memory_limit) { memory_limit_(memory_limit),
use_monotonic_memory_(use_monotonic_memory) {
ctx_.db_accessor = dba; ctx_.db_accessor = dba;
ctx_.symbol_table = plan->symbol_table(); ctx_.symbol_table = plan->symbol_table();
ctx_.evaluation_context.timestamp = QueryTimestamp(); ctx_.evaluation_context.timestamp = QueryTimestamp();
@ -1023,7 +1035,14 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba); ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba);
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
if (license::global_license_checker.IsEnterpriseValidFast() && username.has_value() && dba) { if (license::global_license_checker.IsEnterpriseValidFast() && username.has_value() && dba) {
ctx_.auth_checker = interpreter_context->auth_checker->GetFineGrainedAuthChecker(*username, dba); auto auth_checker = interpreter_context->auth_checker->GetFineGrainedAuthChecker(*username, dba);
// if the user has global privileges to read, edit and write anything, we don't need to perform authorization
// otherwise, we do assign the auth checker to check for label access control
if (!auth_checker->HasGlobalPrivilegeOnVertices(AuthQuery::FineGrainedPrivilege::CREATE_DELETE) ||
!auth_checker->HasGlobalPrivilegeOnEdges(AuthQuery::FineGrainedPrivilege::CREATE_DELETE)) {
ctx_.auth_checker = std::move(auth_checker);
}
} }
#endif #endif
if (interpreter_context->config.execution_timeout_sec > 0) { if (interpreter_context->config.execution_timeout_sec > 0) {
@ -1043,21 +1062,28 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
// single `Pull`. // single `Pull`.
static constexpr size_t stack_size = 256UL * 1024UL; static constexpr size_t stack_size = 256UL * 1024UL;
char stack_data[stack_size]; char stack_data[stack_size];
utils::ResourceWithOutOfMemoryException resource_with_exception;
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
// We can throw on every query because a simple queries for deleting will use only
// the stack allocated buffer.
// Also, we want to throw only when the query engine requests more memory and not the storage
// so we add the exception to the allocator.
// TODO (mferencevic): Tune the parameters accordingly.
utils::PoolResource pool_memory(128, 1024, &monotonic_memory, utils::NewDeleteResource());
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
utils::ResourceWithOutOfMemoryException resource_with_exception;
utils::MonotonicBufferResource monotonic_memory{&stack_data[0], stack_size, &resource_with_exception};
std::optional<utils::PoolResource> pool_memory;
if (!use_monotonic_memory_) {
pool_memory.emplace(8, kExecutionPoolMaxBlockSize, utils::NewDeleteResource(), utils::NewDeleteResource());
} else {
// We can throw on every query because a simple queries for deleting will use only
// the stack allocated buffer.
// Also, we want to throw only when the query engine requests more memory and not the storage
// so we add the exception to the allocator.
// TODO (mferencevic): Tune the parameters accordingly.
pool_memory.emplace(128, 1024, &monotonic_memory, utils::NewDeleteResource());
}
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
if (memory_limit_) { if (memory_limit_) {
maybe_limited_resource.emplace(&pool_memory, *memory_limit_); maybe_limited_resource.emplace(&*pool_memory, *memory_limit_);
ctx_.evaluation_context.memory = &*maybe_limited_resource; ctx_.evaluation_context.memory = &*maybe_limited_resource;
} else { } else {
ctx_.evaluation_context.memory = &pool_memory; ctx_.evaluation_context.memory = &*pool_memory;
} }
// Returns true if a result was pulled. // Returns true if a result was pulled.
@ -1143,7 +1169,6 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
if (in_explicit_transaction_) { if (in_explicit_transaction_) {
throw ExplicitTransactionUsageException("Nested transactions are not supported."); throw ExplicitTransactionUsageException("Nested transactions are not supported.");
} }
in_explicit_transaction_ = true; in_explicit_transaction_ = true;
expect_rollback_ = false; expect_rollback_ = false;
@ -1217,16 +1242,19 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
if (memory_limit) { if (memory_limit) {
spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit)); spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit));
} }
auto clauses = cypher_query->single_query_->clauses_;
if (const auto &clauses = cypher_query->single_query_->clauses_; std::any_of( bool contains_csv = false;
clauses.begin(), clauses.end(), [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) { if (std::any_of(clauses.begin(), clauses.end(),
[](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
notifications->emplace_back( notifications->emplace_back(
SeverityLevel::INFO, NotificationCode::LOAD_CSV_TIP, SeverityLevel::INFO, NotificationCode::LOAD_CSV_TIP,
"It's important to note that the parser parses the values as strings. It's up to the user to " "It's important to note that the parser parses the values as strings. It's up to the user to "
"convert the parsed row values to the appropriate type. This can be done using the built-in " "convert the parsed row values to the appropriate type. This can be done using the built-in "
"conversion functions such as ToInteger, ToFloat, ToBoolean etc."); "conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
contains_csv = true;
} }
// If this is LOAD CSV query, use PoolResource without MonotonicMemoryResource as we want to reuse allocated memory
auto use_monotonic_memory = !contains_csv;
auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query,
parsed_query.parameters, parsed_query.parameters,
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, dba); parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, dba);
@ -1249,7 +1277,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
} }
auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context, auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
execution_memory, StringPointerToOptional(username), transaction_status, execution_memory, StringPointerToOptional(username), transaction_status,
trigger_context_collector, memory_limit); trigger_context_collector, memory_limit, use_monotonic_memory);
return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges), return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
[pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary]( [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> { AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
@ -1971,7 +1999,16 @@ constexpr auto ToStorageIsolationLevel(const IsolationLevelQuery::IsolationLevel
} }
} }
PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_explicit_transaction, constexpr auto ToStorageMode(const StorageModeQuery::StorageMode storage_mode) noexcept {
switch (storage_mode) {
case StorageModeQuery::StorageMode::IN_MEMORY_TRANSACTIONAL:
return storage::StorageMode::IN_MEMORY_TRANSACTIONAL;
case StorageModeQuery::StorageMode::IN_MEMORY_ANALYTICAL:
return storage::StorageMode::IN_MEMORY_ANALYTICAL;
}
}
PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context, Interpreter *interpreter) { InterpreterContext *interpreter_context, Interpreter *interpreter) {
if (in_explicit_transaction) { if (in_explicit_transaction) {
throw IsolationLevelModificationInMulticommandTxException(); throw IsolationLevelModificationInMulticommandTxException();
@ -1986,7 +2023,15 @@ PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_expli
interpreter]() -> std::function<void()> { interpreter]() -> std::function<void()> {
switch (isolation_level_query->isolation_level_scope_) { switch (isolation_level_query->isolation_level_scope_) {
case IsolationLevelQuery::IsolationLevelScope::GLOBAL: case IsolationLevelQuery::IsolationLevelScope::GLOBAL:
return [interpreter_context, isolation_level] { interpreter_context->db->SetIsolationLevel(isolation_level); }; return [interpreter_context, isolation_level] {
if (auto maybe_error = interpreter_context->db->SetIsolationLevel(isolation_level); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case storage::Storage::SetIsolationLevelError::DisabledForAnalyticalMode:
throw IsolationLevelModificationInAnalyticsException();
break;
}
}
};
case IsolationLevelQuery::IsolationLevelScope::SESSION: case IsolationLevelQuery::IsolationLevelScope::SESSION:
return [interpreter, isolation_level] { interpreter->SetSessionIsolationLevel(isolation_level); }; return [interpreter, isolation_level] { interpreter->SetSessionIsolationLevel(isolation_level); };
case IsolationLevelQuery::IsolationLevelScope::NEXT: case IsolationLevelQuery::IsolationLevelScope::NEXT:
@ -2004,6 +2049,41 @@ PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, bool in_expli
RWType::NONE}; RWType::NONE};
} }
PreparedQuery PrepareStorageModeQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
InterpreterContext *interpreter_context) {
if (in_explicit_transaction) {
throw StorageModeModificationInMulticommandTxException();
}
auto *storage_mode_query = utils::Downcast<StorageModeQuery>(parsed_query.query);
MG_ASSERT(storage_mode_query);
const auto storage_mode = ToStorageMode(storage_mode_query->storage_mode_);
auto exists_active_transaction = interpreter_context->interpreters.WithLock([](const auto &interpreters_) {
return std::any_of(interpreters_.begin(), interpreters_.end(), [](const auto &interpreter) {
return interpreter->transaction_status_.load() != TransactionStatus::IDLE;
});
});
if (exists_active_transaction) {
spdlog::info(
"Storage mode will be modified when there are no other active transactions. Check the status of the "
"transactions using 'SHOW TRANSACTIONS' query and ensure no other transactions are active.");
}
auto callback = [storage_mode, interpreter_context]() -> std::function<void()> {
return [interpreter_context, storage_mode] { interpreter_context->db->SetStorageMode(storage_mode); };
}();
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[callback = std::move(callback)](AnyStream * /*stream*/,
std::optional<int> /*n*/) -> std::optional<QueryHandlerResult> {
callback();
return QueryHandlerResult::COMMIT;
},
RWType::NONE};
}
PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_explicit_transaction, PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
InterpreterContext *interpreter_context) { InterpreterContext *interpreter_context) {
if (in_explicit_transaction) { if (in_explicit_transaction) {
@ -2014,11 +2094,18 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
{}, {},
std::move(parsed_query.required_privileges), std::move(parsed_query.required_privileges),
[interpreter_context](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> { [interpreter_context](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
if (auto maybe_error = interpreter_context->db->CreateSnapshot(); maybe_error.HasError()) { if (auto maybe_error = interpreter_context->db->CreateSnapshot({}); maybe_error.HasError()) {
switch (maybe_error.GetError()) { switch (maybe_error.GetError()) {
case storage::Storage::CreateSnapshotError::DisabledForReplica: case storage::Storage::CreateSnapshotError::DisabledForReplica:
throw utils::BasicException( throw utils::BasicException(
"Failed to create a snapshot. Replica instances are not allowed to create them."); "Failed to create a snapshot. Replica instances are not allowed to create them.");
case storage::Storage::CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/replication"));
break;
case storage::Storage::CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;
} }
} }
return QueryHandlerResult::COMMIT; return QueryHandlerResult::COMMIT;
@ -2593,18 +2680,18 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
std::optional<std::string> user = StringPointerToOptional(username); std::optional<std::string> user = StringPointerToOptional(username);
username_ = user; username_ = user;
query_executions_.emplace_back(std::make_unique<QueryExecution>());
auto &query_execution = query_executions_.back();
std::optional<int> qid =
in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
// Handle transaction control queries. // Handle transaction control queries.
const auto upper_case_query = utils::ToUpperCase(query_string); const auto upper_case_query = utils::ToUpperCase(query_string);
const auto trimmed_query = utils::Trim(upper_case_query); const auto trimmed_query = utils::Trim(upper_case_query);
if (trimmed_query == "BEGIN" || trimmed_query == "COMMIT" || trimmed_query == "ROLLBACK") { if (trimmed_query == "BEGIN" || trimmed_query == "COMMIT" || trimmed_query == "ROLLBACK") {
query_executions_.emplace_back(
std::make_unique<QueryExecution>(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
auto &query_execution = query_executions_.back();
std::optional<int> qid =
in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
query_execution->prepared_query.emplace(PrepareTransactionQuery(trimmed_query)); query_execution->prepared_query.emplace(PrepareTransactionQuery(trimmed_query));
return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid}; return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid};
} }
@ -2620,18 +2707,43 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
// If we're not in an explicit transaction block and we have an open // If we're not in an explicit transaction block and we have an open
// transaction, abort it since we're about to prepare a new query. // transaction, abort it since we're about to prepare a new query.
else if (db_accessor_) { else if (db_accessor_) {
AbortCommand(&query_execution); query_executions_.emplace_back(
std::make_unique<QueryExecution>(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
AbortCommand(&query_executions_.back());
} }
std::unique_ptr<QueryExecution> *query_execution_ptr = nullptr;
try { try {
// Set a default cost estimate of 0. Individual queries can overwrite this query_executions_.emplace_back(
// field with an improved estimate. std::make_unique<QueryExecution>(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
query_execution->summary["cost_estimate"] = 0.0; query_execution_ptr = &query_executions_.back();
utils::Timer parsing_timer; utils::Timer parsing_timer;
ParsedQuery parsed_query = ParsedQuery parsed_query =
ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query); ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count(); TypedValue parsing_time{parsing_timer.Elapsed().count()};
if (utils::Downcast<CypherQuery>(parsed_query.query)) {
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
if (const auto &clauses = cypher_query->single_query_->clauses_;
std::any_of(clauses.begin(), clauses.end(),
[](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
// Using PoolResource without MonotonicMemoryResouce for LOAD CSV reduces memory usage.
// QueryExecution MemoryResource is mostly used for allocations done on Frame and storing `row`s
query_executions_[query_executions_.size() - 1] = std::make_unique<QueryExecution>(
utils::PoolResource(1, kExecutionPoolMaxBlockSize, utils::NewDeleteResource(), utils::NewDeleteResource()));
query_execution_ptr = &query_executions_.back();
}
}
auto &query_execution = query_executions_.back();
std::optional<int> qid =
in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
query_execution->summary["parsing_time"] = std::move(parsing_time);
// Set a default cost estimate of 0. Individual queries can overwrite this
// field with an improved estimate.
query_execution->summary["cost_estimate"] = 0.0;
// Some queries require an active transaction in order to be prepared. // Some queries require an active transaction in order to be prepared.
if (!in_explicit_transaction_ && if (!in_explicit_transaction_ &&
@ -2651,12 +2763,14 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
utils::Timer planning_timer; utils::Timer planning_timer;
PreparedQuery prepared_query; PreparedQuery prepared_query;
utils::MemoryResource *memory_resource =
std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; },
query_execution->execution_memory);
if (utils::Downcast<CypherQuery>(parsed_query.query)) { if (utils::Downcast<CypherQuery>(parsed_query.query)) {
prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, prepared_query =
&*execution_db_accessor_, &query_execution->execution_memory, PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&query_execution->notifications, username, &transaction_status_, &*execution_db_accessor_, memory_resource, &query_execution->notifications, username,
trigger_context_collector_ ? &*trigger_context_collector_ : nullptr); &transaction_status_, trigger_context_collector_ ? &*trigger_context_collector_ : nullptr);
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) { } else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception); &*execution_db_accessor_, &query_execution->execution_memory_with_exception);
@ -2666,7 +2780,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception, username, &transaction_status_); &*execution_db_accessor_, &query_execution->execution_memory_with_exception, username, &transaction_status_);
} else if (utils::Downcast<DumpQuery>(parsed_query.query)) { } else if (utils::Downcast<DumpQuery>(parsed_query.query)) {
prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_, prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_,
&query_execution->execution_memory); memory_resource);
} else if (utils::Downcast<IndexQuery>(parsed_query.query)) { } else if (utils::Downcast<IndexQuery>(parsed_query.query)) {
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_, prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
&query_execution->notifications, interpreter_context_); &query_execution->notifications, interpreter_context_);
@ -2713,6 +2827,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_); prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_);
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) { } else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_); prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
} else if (utils::Downcast<StorageModeQuery>(parsed_query.query)) {
prepared_query = PrepareStorageModeQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
} else if (utils::Downcast<TransactionQueueQuery>(parsed_query.query)) { } else if (utils::Downcast<TransactionQueueQuery>(parsed_query.query)) {
prepared_query = PrepareTransactionQueueQuery(std::move(parsed_query), username_, in_explicit_transaction_, prepared_query = PrepareTransactionQueueQuery(std::move(parsed_query), username_, in_explicit_transaction_,
interpreter_context_, &*execution_db_accessor_); interpreter_context_, &*execution_db_accessor_);
@ -2738,7 +2854,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid}; return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid};
} catch (const utils::BasicException &) { } catch (const utils::BasicException &) {
EventCounter::IncrementCounter(EventCounter::FailedQuery); EventCounter::IncrementCounter(EventCounter::FailedQuery);
AbortCommand(&query_execution); AbortCommand(query_execution_ptr);
throw; throw;
} }
} }

View File

@ -51,6 +51,7 @@ extern const Event FailedQuery;
namespace memgraph::query { namespace memgraph::query {
inline constexpr size_t kExecutionMemoryBlockSize = 1UL * 1024UL * 1024UL; inline constexpr size_t kExecutionMemoryBlockSize = 1UL * 1024UL * 1024UL;
inline constexpr size_t kExecutionPoolMaxBlockSize = 32768UL; // 2 ^ 15
class AuthQueryHandler { class AuthQueryHandler {
public: public:
@ -340,13 +341,29 @@ class Interpreter final {
private: private:
struct QueryExecution { struct QueryExecution {
std::optional<PreparedQuery> prepared_query; std::optional<PreparedQuery> prepared_query;
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; std::variant<utils::MonotonicBufferResource, utils::PoolResource> execution_memory;
utils::ResourceWithOutOfMemoryException execution_memory_with_exception{&execution_memory}; utils::ResourceWithOutOfMemoryException execution_memory_with_exception;
std::map<std::string, TypedValue> summary; std::map<std::string, TypedValue> summary;
std::vector<Notification> notifications; std::vector<Notification> notifications;
explicit QueryExecution() = default; explicit QueryExecution(utils::MonotonicBufferResource monotonic_memory)
: execution_memory(std::move(monotonic_memory)) {
std::visit(
[&](auto &memory_resource) {
execution_memory_with_exception = utils::ResourceWithOutOfMemoryException(&memory_resource);
},
execution_memory);
};
explicit QueryExecution(utils::PoolResource pool_resource) : execution_memory(std::move(pool_resource)) {
std::visit(
[&](auto &memory_resource) {
execution_memory_with_exception = utils::ResourceWithOutOfMemoryException(&memory_resource);
},
execution_memory);
};
QueryExecution(const QueryExecution &) = delete; QueryExecution(const QueryExecution &) = delete;
QueryExecution(QueryExecution &&) = default; QueryExecution(QueryExecution &&) = default;
QueryExecution &operator=(const QueryExecution &) = delete; QueryExecution &operator=(const QueryExecution &) = delete;
@ -357,7 +374,7 @@ class Interpreter final {
// destroy the prepared query which is using that instance // destroy the prepared query which is using that instance
// of execution memory. // of execution memory.
prepared_query.reset(); prepared_query.reset();
execution_memory.Release(); std::visit([](auto &memory_resource) { memory_resource.Release(); }, execution_memory);
} }
}; };
@ -445,7 +462,9 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
try { try {
// Wrap the (statically polymorphic) stream type into a common type which // Wrap the (statically polymorphic) stream type into a common type which
// the handler knows. // the handler knows.
AnyStream stream{result_stream, &query_execution->execution_memory}; AnyStream stream{result_stream,
std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; },
query_execution->execution_memory)};
const auto maybe_res = query_execution->prepared_query->query_handler(&stream, n); const auto maybe_res = query_execution->prepared_query->query_handler(&stream, n);
// Stream is using execution memory of the query_execution which // Stream is using execution memory of the query_execution which
// can be deleted after its execution so the stream should be cleared // can be deleted after its execution so the stream should be cleared

View File

@ -564,7 +564,11 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor {
*/ */
auto compare_indices = [](std::optional<LabelPropertyIndex> &found, std::optional<storage::IndexStats> &new_stats, auto compare_indices = [](std::optional<LabelPropertyIndex> &found, std::optional<storage::IndexStats> &new_stats,
int vertex_count) { int vertex_count) {
if (!new_stats.has_value() || vertex_count / 10.0 > found->vertex_count) { if (!new_stats.has_value()) {
return 0;
}
if (vertex_count / 10.0 > found->vertex_count) {
return 1; return 1;
} }
int cmp_avg_group = utils::CompareDecimal(new_stats->avg_group_size, found->index_stats->avg_group_size); int cmp_avg_group = utils::CompareDecimal(new_stats->avg_group_size, found->index_stats->avg_group_size);

View File

@ -117,6 +117,7 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
// current code always follows the logical pattern of "create a delta" and // current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other // "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR. // transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value); CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value);
edge_.ptr->properties.SetProperty(property, value); edge_.ptr->properties.SetProperty(property, value);

View File

@ -12,6 +12,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include "storage/v2/property_value.hpp" #include "storage/v2/property_value.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp" #include "storage/v2/view.hpp"
@ -95,6 +96,9 @@ inline bool PrepareForWrite(Transaction *transaction, TObj *object) {
/// a `DELETE_OBJECT` delta). /// a `DELETE_OBJECT` delta).
/// @throw std::bad_alloc /// @throw std::bad_alloc
inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
return nullptr;
}
transaction->EnsureCommitTimestampExists(); transaction->EnsureCommitTimestampExists();
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(), return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(),
transaction->command_id); transaction->command_id);
@ -105,6 +109,9 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
/// @throw std::bad_alloc /// @throw std::bad_alloc
template <typename TObj, class... Args> template <typename TObj, class... Args>
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&...args) { inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&...args) {
if (transaction->storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
return;
}
transaction->EnsureCommitTimestampExists(); transaction->EnsureCommitTimestampExists();
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_timestamp.get(), auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_timestamp.get(),
transaction->command_id); transaction->command_id);

View File

@ -31,6 +31,7 @@
#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp" #include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp" #include "storage/v2/vertex_accessor.hpp"
#include "utils/file.hpp" #include "utils/file.hpp"
@ -316,6 +317,7 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
Storage::Storage(Config config) Storage::Storage(Config config)
: indices_(&constraints_, config.items), : indices_(&constraints_, config.items),
isolation_level_(config.transaction.isolation_level), isolation_level_(config.transaction.isolation_level),
storage_mode_(StorageMode::IN_MEMORY_TRANSACTIONAL),
config_(config), config_(config),
snapshot_directory_(config_.durability.storage_directory / durability::kSnapshotDirectory), snapshot_directory_(config_.durability.storage_directory / durability::kSnapshotDirectory),
wal_directory_(config_.durability.storage_directory / durability::kWalDirectory), wal_directory_(config_.durability.storage_directory / durability::kWalDirectory),
@ -398,12 +400,19 @@ Storage::Storage(Config config)
} }
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) { if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] { snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] {
if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) { if (auto maybe_error = this->CreateSnapshot({true}); maybe_error.HasError()) {
switch (maybe_error.GetError()) { switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica: case CreateSnapshotError::DisabledForReplica:
spdlog::warn( spdlog::warn(
utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication")); utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
break; break;
case CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/durability"));
break;
case storage::Storage::CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;
} }
} }
}); });
@ -446,23 +455,30 @@ Storage::~Storage() {
snapshot_runner_.Stop(); snapshot_runner_.Stop();
} }
if (config_.durability.snapshot_on_exit) { if (config_.durability.snapshot_on_exit) {
if (auto maybe_error = this->CreateSnapshot(); maybe_error.HasError()) { if (auto maybe_error = this->CreateSnapshot({false}); maybe_error.HasError()) {
switch (maybe_error.GetError()) { switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica: case CreateSnapshotError::DisabledForReplica:
spdlog::warn(utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication")); spdlog::warn(utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
break; break;
case CreateSnapshotError::DisabledForAnalyticsPeriodicCommit:
spdlog::warn(utils::MessageWithLink("Periodic snapshots are disabled for analytical mode.",
"https://memgr.ph/replication"));
break;
case storage::Storage::CreateSnapshotError::ReachedMaxNumTries:
spdlog::warn("Failed to create snapshot. Reached max number of tries. Please contact support");
break;
} }
} }
} }
} }
Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level) Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode)
: storage_(storage), : storage_(storage),
// The lock must be acquired before creating the transaction object to // The lock must be acquired before creating the transaction object to
// prevent freshly created transactions from dangling in an active state // prevent freshly created transactions from dangling in an active state
// during exclusive operations. // during exclusive operations.
storage_guard_(storage_->main_lock_), storage_guard_(storage_->main_lock_),
transaction_(storage->CreateTransaction(isolation_level)), transaction_(storage->CreateTransaction(isolation_level, storage_mode)),
is_transaction_active_(true), is_transaction_active_(true),
config_(storage->config_.items) {} config_(storage->config_.items) {}
@ -490,11 +506,16 @@ VertexAccessor Storage::Accessor::CreateVertex() {
OOMExceptionEnabler oom_exception; OOMExceptionEnabler oom_exception;
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel); auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = storage_->vertices_.access(); auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta}); auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
MG_ASSERT(inserted, "The vertex must be inserted here!"); MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!"); MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
delta->prev.Set(&*it);
if (delta) {
delta->prev.Set(&*it);
}
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_); return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_);
} }
@ -509,11 +530,14 @@ VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
storage_->vertex_id_.store(std::max(storage_->vertex_id_.load(std::memory_order_acquire), gid.AsUint() + 1), storage_->vertex_id_.store(std::max(storage_->vertex_id_.load(std::memory_order_acquire), gid.AsUint() + 1),
std::memory_order_release); std::memory_order_release);
auto acc = storage_->vertices_.access(); auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Vertex{gid, delta}); auto [it, inserted] = acc.insert(Vertex{gid, delta});
MG_ASSERT(inserted, "The vertex must be inserted here!"); MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!"); MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
delta->prev.Set(&*it); if (delta) {
delta->prev.Set(&*it);
}
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_); return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_);
} }
@ -656,12 +680,15 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
EdgeRef edge(gid); EdgeRef edge(gid);
if (config_.properties_on_edges) { if (config_.properties_on_edges) {
auto acc = storage_->edges_.access(); auto acc = storage_->edges_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Edge(gid, delta)); auto [it, inserted] = acc.insert(Edge(gid, delta));
MG_ASSERT(inserted, "The edge must be inserted here!"); MG_ASSERT(inserted, "The edge must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Edge accessor!"); MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
edge = EdgeRef(&*it); edge = EdgeRef(&*it);
delta->prev.Set(&*it); if (delta) {
delta->prev.Set(&*it);
}
} }
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
@ -724,12 +751,15 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
EdgeRef edge(gid); EdgeRef edge(gid);
if (config_.properties_on_edges) { if (config_.properties_on_edges) {
auto acc = storage_->edges_.access(); auto acc = storage_->edges_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Edge(gid, delta)); auto [it, inserted] = acc.insert(Edge(gid, delta));
MG_ASSERT(inserted, "The edge must be inserted here!"); MG_ASSERT(inserted, "The edge must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Edge accessor!"); MG_ASSERT(it != acc.end(), "Invalid Edge accessor!");
edge = EdgeRef(&*it); edge = EdgeRef(&*it);
delta->prev.Set(&*it); if (delta) {
delta->prev.Set(&*it);
}
} }
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
@ -1380,7 +1410,7 @@ VerticesIterable Storage::Accessor::Vertices(LabelId label, PropertyId property,
storage_->indices_.label_property_index.Vertices(label, property, lower_bound, upper_bound, view, &transaction_)); storage_->indices_.label_property_index.Vertices(label, property, lower_bound, upper_bound, view, &transaction_));
} }
Transaction Storage::CreateTransaction(IsolationLevel isolation_level) { Transaction Storage::CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) {
// We acquire the transaction engine lock here because we access (and // We acquire the transaction engine lock here because we access (and
// modify) the transaction engine variables (`transaction_id` and // modify) the transaction engine variables (`transaction_id` and
// `timestamp`) below. // `timestamp`) below.
@ -1401,7 +1431,7 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level) {
start_timestamp = timestamp_++; start_timestamp = timestamp_++;
} }
} }
return {transaction_id, start_timestamp, isolation_level}; return {transaction_id, start_timestamp, isolation_level, storage_mode};
} }
template <bool force> template <bool force>
@ -1907,27 +1937,47 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera
return finalized_on_all_replicas; return finalized_on_all_replicas;
} }
utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot() { utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot(std::optional<bool> is_periodic) {
if (replication_role_.load() != ReplicationRole::MAIN) { if (replication_role_.load() != ReplicationRole::MAIN) {
return CreateSnapshotError::DisabledForReplica; return CreateSnapshotError::DisabledForReplica;
} }
auto snapshot_creator = [this]() {
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_);
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_, &name_id_mapper_,
&indices_, &constraints_, config_, uuid_, epoch_id_, epoch_history_, &file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);
};
std::lock_guard snapshot_guard(snapshot_lock_); std::lock_guard snapshot_guard(snapshot_lock_);
// Take master RW lock (for reading). auto should_try_shared{true};
std::shared_lock<utils::RWLock> storage_guard(main_lock_); auto max_num_tries{10};
while (max_num_tries) {
if (should_try_shared) {
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
snapshot_creator();
return {};
}
} else {
std::unique_lock main_guard{main_lock_};
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) {
if (is_periodic && *is_periodic) {
return CreateSnapshotError::DisabledForAnalyticsPeriodicCommit;
}
snapshot_creator();
return {};
}
}
should_try_shared = !should_try_shared;
max_num_tries--;
}
// Create the transaction used to create the snapshot. return CreateSnapshotError::ReachedMaxNumTries;
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION);
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_, &name_id_mapper_,
&indices_, &constraints_, config_, uuid_, epoch_id_, epoch_history_, &file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);
return {};
} }
bool Storage::LockPath() { bool Storage::LockPath() {
@ -2113,11 +2163,23 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
}); });
} }
void Storage::SetIsolationLevel(IsolationLevel isolation_level) { utils::BasicResult<Storage::SetIsolationLevelError> Storage::SetIsolationLevel(IsolationLevel isolation_level) {
std::unique_lock main_guard{main_lock_}; std::unique_lock main_guard{main_lock_};
if (storage_mode_ == storage::StorageMode::IN_MEMORY_ANALYTICAL) {
return Storage::SetIsolationLevelError::DisabledForAnalyticalMode;
}
isolation_level_ = isolation_level; isolation_level_ = isolation_level;
return {};
} }
void Storage::SetStorageMode(StorageMode storage_mode) {
std::unique_lock main_guard{main_lock_};
storage_mode_ = storage_mode;
}
StorageMode Storage::GetStorageMode() { return storage_mode_; }
void Storage::RestoreReplicas() { void Storage::RestoreReplicas() {
MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole()); MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole());
if (!ShouldStoreAndRestoreReplicas()) { if (!ShouldStoreAndRestoreReplicas()) {

View File

@ -33,6 +33,7 @@
#include "storage/v2/mvcc.hpp" #include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp" #include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/result.hpp" #include "storage/v2/result.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp" #include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp" #include "storage/v2/vertex_accessor.hpp"
@ -200,7 +201,7 @@ class Storage final {
private: private:
friend class Storage; friend class Storage;
explicit Accessor(Storage *storage, IsolationLevel isolation_level); explicit Accessor(Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode);
public: public:
Accessor(const Accessor &) = delete; Accessor(const Accessor &) = delete;
@ -368,7 +369,7 @@ class Storage final {
}; };
Accessor Access(std::optional<IsolationLevel> override_isolation_level = {}) { Accessor Access(std::optional<IsolationLevel> override_isolation_level = {}) {
return Accessor{this, override_isolation_level.value_or(isolation_level_)}; return Accessor{this, override_isolation_level.value_or(isolation_level_), storage_mode_};
} }
const std::string &LabelToName(LabelId label) const; const std::string &LabelToName(LabelId label) const;
@ -509,14 +510,24 @@ class Storage final {
void FreeMemory(); void FreeMemory();
void SetIsolationLevel(IsolationLevel isolation_level); enum class SetIsolationLevelError : uint8_t { DisabledForAnalyticalMode };
enum class CreateSnapshotError : uint8_t { DisabledForReplica }; utils::BasicResult<SetIsolationLevelError> SetIsolationLevel(IsolationLevel isolation_level);
utils::BasicResult<CreateSnapshotError> CreateSnapshot(); void SetStorageMode(StorageMode storage_mode);
StorageMode GetStorageMode();
enum class CreateSnapshotError : uint8_t {
DisabledForReplica,
DisabledForAnalyticsPeriodicCommit,
ReachedMaxNumTries
};
utils::BasicResult<CreateSnapshotError> CreateSnapshot(std::optional<bool> is_periodic);
private: private:
Transaction CreateTransaction(IsolationLevel isolation_level); Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode);
/// The force parameter determines the behaviour of the garbage collector. /// The force parameter determines the behaviour of the garbage collector.
/// If it's set to true, it will behave as a global operation, i.e. it can't /// If it's set to true, it will behave as a global operation, i.e. it can't
@ -582,6 +593,7 @@ class Storage final {
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_; utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
IsolationLevel isolation_level_; IsolationLevel isolation_level_;
StorageMode storage_mode_;
Config config_; Config config_;
utils::Scheduler gc_runner_; utils::Scheduler gc_runner_;

View File

@ -0,0 +1,20 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
namespace memgraph::storage {
enum class StorageMode : std::uint8_t { IN_MEMORY_ANALYTICAL, IN_MEMORY_TRANSACTIONAL };
} // namespace memgraph::storage

View File

@ -22,6 +22,7 @@
#include "storage/v2/edge.hpp" #include "storage/v2/edge.hpp"
#include "storage/v2/isolation_level.hpp" #include "storage/v2/isolation_level.hpp"
#include "storage/v2/property_value.hpp" #include "storage/v2/property_value.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex.hpp" #include "storage/v2/vertex.hpp"
#include "storage/v2/view.hpp" #include "storage/v2/view.hpp"
@ -31,12 +32,14 @@ const uint64_t kTimestampInitialId = 0;
const uint64_t kTransactionInitialId = 1ULL << 63U; const uint64_t kTransactionInitialId = 1ULL << 63U;
struct Transaction { struct Transaction {
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level) Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level,
StorageMode storage_mode)
: transaction_id(transaction_id), : transaction_id(transaction_id),
start_timestamp(start_timestamp), start_timestamp(start_timestamp),
command_id(0), command_id(0),
must_abort(false), must_abort(false),
isolation_level(isolation_level) {} isolation_level(isolation_level),
storage_mode(storage_mode) {}
Transaction(Transaction &&other) noexcept Transaction(Transaction &&other) noexcept
: transaction_id(other.transaction_id.load(std::memory_order_acquire)), : transaction_id(other.transaction_id.load(std::memory_order_acquire)),
@ -45,7 +48,8 @@ struct Transaction {
command_id(other.command_id), command_id(other.command_id),
deltas(std::move(other.deltas)), deltas(std::move(other.deltas)),
must_abort(other.must_abort), must_abort(other.must_abort),
isolation_level(other.isolation_level) {} isolation_level(other.isolation_level),
storage_mode(other.storage_mode) {}
Transaction(const Transaction &) = delete; Transaction(const Transaction &) = delete;
Transaction &operator=(const Transaction &) = delete; Transaction &operator=(const Transaction &) = delete;
@ -70,6 +74,7 @@ struct Transaction {
std::list<Delta> deltas; std::list<Delta> deltas;
bool must_abort; bool must_abort;
IsolationLevel isolation_level; IsolationLevel isolation_level;
StorageMode storage_mode;
}; };
inline bool operator==(const Transaction &first, const Transaction &second) { inline bool operator==(const Transaction &first, const Transaction &second) {

View File

@ -222,6 +222,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
// current code always follows the logical pattern of "create a delta" and // current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other // "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR. // transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value); CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value);
vertex_->properties.SetProperty(property, value); vertex_->properties.SetProperty(property, value);

57
src/utils/enum.hpp Normal file
View File

@ -0,0 +1,57 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <optional>
#include <string>
#include "utils/result.hpp"
#include "utils/string.hpp"
namespace memgraph::utils {
enum class ValidationError : uint8_t { EmptyValue, InvalidValue };
// Returns joined string representations for every enum in the mapping.
std::string GetAllowedEnumValuesString(const auto &mappings) {
std::vector<std::string> allowed_values;
allowed_values.reserve(mappings.size());
std::transform(mappings.begin(), mappings.end(), std::back_inserter(allowed_values),
[](const auto &mapping) { return std::string(mapping.first); });
return memgraph::utils::Join(allowed_values, ", ");
}
// Checks if the string value can be represented as an enum.
// If not, the BasicResult will contain an error.
memgraph::utils::BasicResult<ValidationError> IsValidEnumValueString(const auto &value, const auto &mappings) {
if (value.empty()) {
return ValidationError::EmptyValue;
}
if (std::find_if(mappings.begin(), mappings.end(), [&](const auto &mapping) { return mapping.first == value; }) ==
mappings.cend()) {
return ValidationError::InvalidValue;
}
return {};
}
// Tries to convert a string into enum, which would then contain a value if the conversion
// has been successful.
template <typename Enum>
std::optional<Enum> StringToEnum(const auto &value, const auto &mappings) {
const auto mapping_iter =
std::find_if(mappings.begin(), mappings.end(), [&](const auto &mapping) { return mapping.first == value; });
if (mapping_iter == mappings.cend()) {
return std::nullopt;
}
return mapping_iter->second;
}
} // namespace memgraph::utils

View File

@ -171,6 +171,7 @@ enum class TypeId : uint64_t {
AST_FREE_MEMORY_QUERY, AST_FREE_MEMORY_QUERY,
AST_TRIGGER_QUERY, AST_TRIGGER_QUERY,
AST_ISOLATION_LEVEL_QUERY, AST_ISOLATION_LEVEL_QUERY,
AST_STORAGE_MODE_QUERY,
AST_CREATE_SNAPSHOT_QUERY, AST_CREATE_SNAPSHOT_QUERY,
AST_STREAM_QUERY, AST_STREAM_QUERY,
AST_SETTING_QUERY, AST_SETTING_QUERY,

View File

@ -98,6 +98,7 @@ startup_config_dict = {
"IP address on which the websocket server for Memgraph monitoring should listen.", "IP address on which the websocket server for Memgraph monitoring should listen.",
), ),
"monitoring_port": ("7444", "7444", "Port on which the websocket server for Memgraph monitoring should listen."), "monitoring_port": ("7444", "7444", "Port on which the websocket server for Memgraph monitoring should listen."),
"password_encryption_algorithm": ("bcrypt", "bcrypt", "The password encryption algorithm used for authentication."),
"pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."), "pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."),
"query_execution_timeout_sec": ( "query_execution_timeout_sec": (
"600", "600",

View File

@ -37,6 +37,7 @@ BASIC_PRIVILEGES = [
"WEBSOCKET", "WEBSOCKET",
"MODULE_WRITE", "MODULE_WRITE",
"TRANSACTION_MANAGEMENT", "TRANSACTION_MANAGEMENT",
"STORAGE_MODE",
] ]
@ -60,7 +61,7 @@ def test_lba_procedures_show_privileges_first_user():
cursor = connect(username="Josip", password="").cursor() cursor = connect(username="Josip", password="").cursor()
result = execute_and_fetch_all(cursor, "SHOW PRIVILEGES FOR Josip;") result = execute_and_fetch_all(cursor, "SHOW PRIVILEGES FOR Josip;")
assert len(result) == 31 assert len(result) == 32
fine_privilege_results = [res for res in result if res[0] not in BASIC_PRIVILEGES] fine_privilege_results = [res for res in result if res[0] not in BASIC_PRIVILEGES]

View File

@ -30,3 +30,6 @@ add_subdirectory(env_variable_check)
#flag check binaries #flag check binaries
add_subdirectory(flag_check) add_subdirectory(flag_check)
#flag check binaries
add_subdirectory(storage_mode)

View File

@ -0,0 +1,6 @@
set(target_name memgraph__integration__storage_mode)
set(tester_target_name ${target_name}__tester)
add_executable(${tester_target_name} tester.cpp)
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
target_link_libraries(${tester_target_name} mg-communication)

View File

@ -0,0 +1,196 @@
import argparse
import atexit
import os
import subprocess
import sys
import tempfile
import time
from typing import List
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
node_queries = [
"CREATE (:Label {prop:'1'})",
"CREATE (:Label {prop:'2'})",
]
edge_queries = ["MATCH (l1:Label),(l2:Label) WHERE l1.prop = '1' AND l2.prop = '2' CREATE (l1)-[r:edgeType1]->(l2)"]
assertion_queries = [
f"MATCH (n) WITH count(n) as cnt RETURN assert(cnt={len(node_queries)});",
f"MATCH (n)-[e]->(m) WITH count(e) as cnt RETURN assert(cnt={len(edge_queries)});",
]
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def prepare_memgraph(memgraph_args):
# Start the memgraph binary
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
return memgraph
def terminate_memgraph(memgraph):
memgraph.terminate()
time.sleep(0.1)
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
def execute_tester(
binary, queries, should_fail=False, failure_message="", username="", password="", check_failure=True
):
args = [binary, "--username", username, "--password", password]
if should_fail:
args.append("--should-fail")
if failure_message:
args.extend(["--failure-message", failure_message])
if check_failure:
args.append("--check-failure")
args.extend(queries)
subprocess.run(args).check_returncode()
def execute_test_analytical_mode(memgraph_binary: str, tester_binary: str) -> None:
def execute_queries(queries):
return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="")
storage_directory = tempfile.TemporaryDirectory()
memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name])
print("\033[1;36m~~ Starting creating & loading snapshot test ~~\033[0m")
execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"])
# Prepare all nodes
execute_queries(node_queries)
# Prepare all edges
execute_queries(edge_queries)
execute_queries(["CREATE SNAPSHOT;"])
print("\033[1;36m~~ Created snapshot ~~\033[0m\n")
# Shutdown the memgraph binary with wait
terminate_memgraph(memgraph)
# Start the memgraph binary
memgraph = prepare_memgraph(
[memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"]
)
execute_queries(assertion_queries)
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
def execute_test_switch_analytical_transactional(memgraph_binary: str, tester_binary: str) -> None:
def execute_queries(queries):
return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="")
storage_directory = tempfile.TemporaryDirectory()
# Start the memgraph binary
memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name])
print("\033[1;36m~~ Starting switch storage modes test ~~\033[0m")
# switch to IN_MEMORY_ANALYTICAL
execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"])
# Prepare all nodes
execute_queries(node_queries)
# switch back to IN_MEMORY_TRANSACTIONAL
execute_queries(["STORAGE MODE IN_MEMORY_TRANSACTIONAL"])
# Prepare all edges
execute_queries(edge_queries)
execute_queries(["CREATE SNAPSHOT;"])
print("\033[1;36m~~ Created snapshot ~~\033[0m\n")
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
# Shutdown the memgraph binary with wait
terminate_memgraph(memgraph)
print("\033[1;36m~~ Starting memgraph with snapshot recovery ~~\033[0m\n")
memgraph = prepare_memgraph(
[memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"]
)
execute_queries(assertion_queries)
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
def execute_test_switch_transactional_analytical(memgraph_binary: str, tester_binary: str) -> None:
def execute_queries(queries):
return execute_tester(tester_binary, queries, should_fail=False, check_failure=True, username="", password="")
storage_directory = tempfile.TemporaryDirectory()
# Start the memgraph binary
memgraph = prepare_memgraph([memgraph_binary, "--data-directory", storage_directory.name])
print("\033[1;36m~~ Starting switch storage modes test ~~\033[0m")
# Prepare all nodes
execute_queries(node_queries)
# switch to IN_MEMORY_ANALYTICAL
execute_queries(["STORAGE MODE IN_MEMORY_ANALYTICAL"])
# Prepare all edges
execute_queries(edge_queries)
execute_queries(["CREATE SNAPSHOT;"])
print("\033[1;36m~~ Created snapshot ~~\033[0m\n")
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
# Shutdown the memgraph binary with wait
terminate_memgraph(memgraph)
print("\033[1;36m~~ Starting memgraph with snapshot recovery ~~\033[0m\n")
memgraph = prepare_memgraph(
[memgraph_binary, "--data-directory", storage_directory.name, "--storage-recover-on-startup=true"]
)
execute_queries(assertion_queries)
print("\033[1;36m~~ Terminating memgraph ~~\033[0m\n")
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
if __name__ == "__main__":
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
tester_binary = os.path.join(PROJECT_DIR, "build", "tests", "integration", "storage_mode", "tester")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--tester", default=tester_binary)
args = parser.parse_args()
execute_test_analytical_mode(args.memgraph, args.tester)
execute_test_switch_analytical_transactional(args.memgraph, args.tester)
execute_test_switch_transactional_analytical(args.memgraph, args.tester)
sys.exit(0)

View File

@ -0,0 +1,85 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gflags/gflags.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_bool(check_failure, false, "Set to true to enable failure checking.");
DEFINE_bool(should_fail, false, "Set to true to expect a failure.");
DEFINE_string(failure_message, "", "Set to the expected failure message.");
/**
* Executes queries passed as positional arguments and verifies whether they
* succeeded, failed, failed with a specific error message or executed without a
* specific error occurring.
*/
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
memgraph::communication::SSLInit sslInit;
memgraph::io::network::Endpoint endpoint(memgraph::io::network::ResolveHostname(FLAGS_address), FLAGS_port);
memgraph::communication::ClientContext context(FLAGS_use_ssl);
memgraph::communication::bolt::Client client(context);
client.Connect(endpoint, FLAGS_username, FLAGS_password);
for (int i = 1; i < argc; ++i) {
std::string query(argv[i]);
std::cout << query << std::endl;
try {
client.Execute(query, {});
} catch (const memgraph::communication::bolt::ClientQueryException &e) {
if (!FLAGS_check_failure) {
if (!FLAGS_failure_message.empty() && e.what() == FLAGS_failure_message) {
LOG_FATAL(
"The query should have succeeded or failed with an error "
"message that isn't equal to '{}' but it failed with that error "
"message",
FLAGS_failure_message);
}
continue;
}
if (FLAGS_should_fail) {
if (!FLAGS_failure_message.empty() && e.what() != FLAGS_failure_message) {
LOG_FATAL(
"The query should have failed with an error message of '{}'' but "
"instead it failed with '{}'",
FLAGS_failure_message, e.what());
}
return 0;
} else {
LOG_FATAL(
"The query shoudn't have failed but it failed with an "
"error message '{}'",
e.what());
}
}
if (!FLAGS_check_failure) continue;
if (FLAGS_should_fail) {
LOG_FATAL(
"The query should have failed but instead it executed "
"successfully!");
}
}
return 0;
}

View File

@ -337,6 +337,8 @@ void ExecuteWorkload(
std::vector<std::thread> threads; std::vector<std::thread> threads;
threads.reserve(FLAGS_num_workers); threads.reserve(FLAGS_num_workers);
auto total_time_start = std::chrono::steady_clock::now();
std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0); std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0);
std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata()); std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata());
std::vector<double> worker_duration(FLAGS_num_workers, 0.0); std::vector<double> worker_duration(FLAGS_num_workers, 0.0);
@ -398,8 +400,12 @@ void ExecuteWorkload(
final_duration += worker_duration[i]; final_duration += worker_duration[i];
} }
auto total_time_end = std::chrono::steady_clock::now();
auto total_time = std::chrono::duration_cast<std::chrono::duration<double>>(total_time_end - total_time_start);
final_duration /= FLAGS_num_workers; final_duration /= FLAGS_num_workers;
nlohmann::json summary = nlohmann::json::object(); nlohmann::json summary = nlohmann::json::object();
summary["total_time"] = total_time.count();
summary["count"] = queries.size(); summary["count"] = queries.size();
summary["duration"] = final_duration; summary["duration"] = final_duration;
summary["throughput"] = static_cast<double>(queries.size()) / final_duration; summary["throughput"] = static_cast<double>(queries.size()) / final_duration;

View File

@ -319,7 +319,7 @@ add_unit_test(storage_v2_property_store.cpp)
target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2 fmt) target_link_libraries(${test_prefix}storage_v2_property_store mg-storage-v2 fmt)
add_unit_test(storage_v2_wal_file.cpp) add_unit_test(storage_v2_wal_file.cpp)
target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 fmt) target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 storage_test_utils fmt)
add_unit_test(storage_v2_replication.cpp) add_unit_test(storage_v2_replication.cpp)
target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt) target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
@ -327,6 +327,9 @@ target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
add_unit_test(storage_v2_isolation_level.cpp) add_unit_test(storage_v2_isolation_level.cpp)
target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2) target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2)
add_unit_test(storage_v2_storage_mode.cpp)
target_link_libraries(${test_prefix}storage_v2_storage_mode mg-storage-v2 storage_test_utils mg-query mg-glue)
add_unit_test(replication_persistence_helper.cpp) add_unit_test(replication_persistence_helper.cpp)
target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2) target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2)

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -28,6 +28,7 @@ namespace fs = std::filesystem;
DECLARE_bool(auth_password_permit_null); DECLARE_bool(auth_password_permit_null);
DECLARE_string(auth_password_strength_regex); DECLARE_string(auth_password_strength_regex);
DECLARE_string(password_encryption_algorithm);
class AuthWithStorage : public ::testing::Test { class AuthWithStorage : public ::testing::Test {
protected: protected:
@ -55,23 +56,22 @@ TEST_F(AuthWithStorage, AddRole) {
TEST_F(AuthWithStorage, RemoveRole) { TEST_F(AuthWithStorage, RemoveRole) {
ASSERT_TRUE(auth.AddRole("admin")); ASSERT_TRUE(auth.AddRole("admin"));
ASSERT_TRUE(auth.RemoveRole("admin")); ASSERT_TRUE(auth.RemoveRole("admin"));
ASSERT_FALSE(auth.RemoveRole("user")); class AuthWithStorage : public ::testing::Test {
ASSERT_FALSE(auth.RemoveRole("user")); protected:
} virtual void SetUp() {
memgraph::utils::EnsureDir(test_folder_);
FLAGS_auth_password_permit_null = true;
FLAGS_auth_password_strength_regex = ".+";
TEST_F(AuthWithStorage, AddUser) { memgraph::license::global_license_checker.EnableTesting();
ASSERT_FALSE(auth.HasUsers()); }
ASSERT_TRUE(auth.AddUser("test"));
ASSERT_TRUE(auth.HasUsers());
ASSERT_TRUE(auth.AddUser("test2"));
ASSERT_FALSE(auth.AddUser("test"));
}
TEST_F(AuthWithStorage, RemoveUser) { virtual void TearDown() { fs::remove_all(test_folder_); }
ASSERT_FALSE(auth.HasUsers());
ASSERT_TRUE(auth.AddUser("test")); fs::path test_folder_{fs::temp_directory_path() / "MG_tests_unit_auth"};
ASSERT_TRUE(auth.HasUsers());
ASSERT_TRUE(auth.RemoveUser("test")); Auth auth{test_folder_ / ("unit_auth_test_" + std::to_string(static_cast<int>(getpid())))};
};
ASSERT_FALSE(auth.HasUsers()); ASSERT_FALSE(auth.HasUsers());
ASSERT_FALSE(auth.RemoveUser("test2")); ASSERT_FALSE(auth.RemoveUser("test2"));
ASSERT_FALSE(auth.RemoveUser("test")); ASSERT_FALSE(auth.RemoveUser("test"));
@ -926,3 +926,86 @@ TEST(AuthWithoutStorage, Crypto) {
ASSERT_TRUE(VerifyPassword("hello", hash)); ASSERT_TRUE(VerifyPassword("hello", hash));
ASSERT_FALSE(VerifyPassword("hello1", hash)); ASSERT_FALSE(VerifyPassword("hello1", hash));
} }
class AuthWithVariousEncryptionAlgorithms : public ::testing::Test {
protected:
virtual void SetUp() { FLAGS_password_encryption_algorithm = "bcrypt"; }
};
TEST_F(AuthWithVariousEncryptionAlgorithms, VerifyPasswordDefault) {
auto hash = EncryptPassword("hello");
ASSERT_TRUE(VerifyPassword("hello", hash));
ASSERT_FALSE(VerifyPassword("hello1", hash));
}
TEST_F(AuthWithVariousEncryptionAlgorithms, VerifyPasswordSHA256) {
FLAGS_password_encryption_algorithm = "sha256";
auto hash = EncryptPassword("hello");
ASSERT_TRUE(VerifyPassword("hello", hash));
ASSERT_FALSE(VerifyPassword("hello1", hash));
}
TEST_F(AuthWithVariousEncryptionAlgorithms, VerifyPasswordSHA256_1024) {
FLAGS_password_encryption_algorithm = "sha256-multiple";
auto hash = EncryptPassword("hello");
ASSERT_TRUE(VerifyPassword("hello", hash));
ASSERT_FALSE(VerifyPassword("hello1", hash));
}
TEST_F(AuthWithVariousEncryptionAlgorithms, VerifyPasswordThrow) {
FLAGS_password_encryption_algorithm = "abcd";
ASSERT_THROW(EncryptPassword("hello"), AuthException);
}
TEST_F(AuthWithVariousEncryptionAlgorithms, VerifyPasswordEmptyEncryptionThrow) {
FLAGS_password_encryption_algorithm = "";
ASSERT_THROW(EncryptPassword("hello"), AuthException);
}
class AuthWithStorageWithVariousEncryptionAlgorithms : public ::testing::Test {
protected:
virtual void SetUp() {
memgraph::utils::EnsureDir(test_folder_);
FLAGS_auth_password_permit_null = true;
FLAGS_auth_password_strength_regex = ".+";
FLAGS_password_encryption_algorithm = "bcrypt";
memgraph::license::global_license_checker.EnableTesting();
}
virtual void TearDown() { fs::remove_all(test_folder_); }
fs::path test_folder_{fs::temp_directory_path() / "MG_tests_unit_auth"};
Auth auth{test_folder_ / ("unit_auth_test_" + std::to_string(static_cast<int>(getpid())))};
};
TEST_F(AuthWithStorageWithVariousEncryptionAlgorithms, AddUserDefault) {
auto user = auth.AddUser("Alice", "alice");
ASSERT_TRUE(user);
ASSERT_EQ(user->username(), "alice");
}
TEST_F(AuthWithStorageWithVariousEncryptionAlgorithms, AddUserSha256) {
FLAGS_password_encryption_algorithm = "sha256";
auto user = auth.AddUser("Alice", "alice");
ASSERT_TRUE(user);
ASSERT_EQ(user->username(), "alice");
}
TEST_F(AuthWithStorageWithVariousEncryptionAlgorithms, AddUserSha256_1024) {
FLAGS_password_encryption_algorithm = "sha256-multiple";
auto user = auth.AddUser("Alice", "alice");
ASSERT_TRUE(user);
ASSERT_EQ(user->username(), "alice");
}
TEST_F(AuthWithStorageWithVariousEncryptionAlgorithms, AddUserThrow) {
FLAGS_password_encryption_algorithm = "abcd";
ASSERT_THROW(auth.AddUser("Alice", "alice"), AuthException);
}
TEST_F(AuthWithStorageWithVariousEncryptionAlgorithms, AddUserEmptyPasswordEncryptionThrow) {
FLAGS_password_encryption_algorithm = "";
ASSERT_THROW(auth.AddUser("Alice", "alice"), AuthException);
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -17,4 +17,13 @@ size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, mem
for (auto it = vertices.begin(); it != vertices.end(); ++it, ++count) for (auto it = vertices.begin(); it != vertices.end(); ++it, ++count)
; ;
return count; return count;
} }
std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode) {
switch (storage_mode) {
case memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL:
return "IN_MEMORY_ANALYTICAL";
case memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL:
return "IN_MEMORY_TRANSACTIONAL";
}
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -14,4 +14,9 @@
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/view.hpp" #include "storage/v2/view.hpp"
size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, memgraph::storage::View view); size_t CountVertices(memgraph::storage::Storage::Accessor &storage_accessor, memgraph::storage::View view);
std::string_view StorageModeToString(memgraph::storage::StorageMode storage_mode);
inline constexpr std::array storage_modes{memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL,
memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL};

View File

@ -0,0 +1,143 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gtest/gtest.h>
#include <chrono>
#include <stop_token>
#include <string>
#include <string_view>
#include <thread>
#include "interpreter_faker.hpp"
#include "query/exceptions.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "storage_test_utils.hpp"
class StorageModeTest : public ::testing::TestWithParam<memgraph::storage::StorageMode> {
public:
struct PrintStringParamToName {
std::string operator()(const testing::TestParamInfo<memgraph::storage::StorageMode> &info) {
return std::string(StorageModeToString(static_cast<memgraph::storage::StorageMode>(info.param)));
}
};
};
// you should be able to see nodes if there is analytics mode
TEST_P(StorageModeTest, Mode) {
const memgraph::storage::StorageMode storage_mode = GetParam();
memgraph::storage::Storage storage{
{.transaction{.isolation_level = memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION}}};
storage.SetStorageMode(storage_mode);
auto creator = storage.Access();
auto other_analytics_mode_reader = storage.Access();
ASSERT_EQ(CountVertices(creator, memgraph::storage::View::OLD), 0);
ASSERT_EQ(CountVertices(other_analytics_mode_reader, memgraph::storage::View::OLD), 0);
static constexpr int vertex_creation_count = 10;
{
for (size_t i = 1; i <= vertex_creation_count; i++) {
creator.CreateVertex();
int64_t expected_vertices_count = storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL ? i : 0;
ASSERT_EQ(CountVertices(creator, memgraph::storage::View::OLD), expected_vertices_count);
ASSERT_EQ(CountVertices(other_analytics_mode_reader, memgraph::storage::View::OLD), expected_vertices_count);
}
}
ASSERT_FALSE(creator.Commit().HasError());
}
INSTANTIATE_TEST_CASE_P(ParameterizedStorageModeTests, StorageModeTest, ::testing::ValuesIn(storage_modes),
StorageModeTest::PrintStringParamToName());
class StorageModeMultiTxTest : public ::testing::Test {
protected:
memgraph::storage::Storage db_;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_storage_mode"};
memgraph::query::InterpreterContext interpreter_context{&db_, {}, data_directory};
InterpreterFaker running_interpreter{&interpreter_context}, main_interpreter{&interpreter_context};
};
TEST_F(StorageModeMultiTxTest, ModeSwitchInactiveTransaction) {
bool started = false;
std::jthread running_thread = std::jthread(
[this, &started](std::stop_token st, int thread_index) {
running_interpreter.Interpret("CREATE ();");
started = true;
},
0);
{
while (!started) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL");
// should change state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL);
// finish thread
running_thread.request_stop();
}
}
TEST_F(StorageModeMultiTxTest, ModeSwitchActiveTransaction) {
// transactional state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("BEGIN");
bool started = false;
bool finished = false;
std::jthread running_thread = std::jthread(
[this, &started, &finished](std::stop_token st, int thread_index) {
started = true;
// running interpreter try to change
running_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL");
finished = true;
},
0);
{
while (!started) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// should not change still
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("COMMIT");
while (!finished) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
// should change state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL);
// finish thread
running_thread.request_stop();
}
}
TEST_F(StorageModeMultiTxTest, ErrorChangeIsolationLevel) {
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL);
main_interpreter.Interpret("STORAGE MODE IN_MEMORY_ANALYTICAL");
// should change state
ASSERT_EQ(db_.GetStorageMode(), memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL);
ASSERT_THROW(running_interpreter.Interpret("SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;"),
memgraph::query::IsolationLevelModificationInAnalyticsException);
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -22,6 +22,7 @@
#include "storage/v2/durability/wal.hpp" #include "storage/v2/durability/wal.hpp"
#include "storage/v2/mvcc.hpp" #include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp" #include "storage/v2/name_id_mapper.hpp"
#include "storage_test_utils.hpp"
#include "utils/file.hpp" #include "utils/file.hpp"
#include "utils/file_locker.hpp" #include "utils/file_locker.hpp"
#include "utils/uuid.hpp" #include "utils/uuid.hpp"
@ -58,15 +59,18 @@ class DeltaGenerator final {
explicit Transaction(DeltaGenerator *gen) explicit Transaction(DeltaGenerator *gen)
: gen_(gen), : gen_(gen),
transaction_(gen->transaction_id_++, gen->timestamp_++, transaction_(gen->transaction_id_++, gen->timestamp_++, memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION,
memgraph::storage::IsolationLevel::SNAPSHOT_ISOLATION) {} gen->storage_mode_) {}
public: public:
memgraph::storage::Vertex *CreateVertex() { memgraph::storage::Vertex *CreateVertex() {
auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++); auto gid = memgraph::storage::Gid::FromUint(gen_->vertices_count_++);
auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_); auto delta = memgraph::storage::CreateDeleteObjectDelta(&transaction_);
auto &it = gen_->vertices_.emplace_back(gid, delta); auto &it = gen_->vertices_.emplace_back(gid, delta);
delta->prev.Set(&it); if (delta != nullptr) {
delta->prev.Set(&it);
}
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return &it;
{ {
memgraph::storage::durability::WalDeltaData data; memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_CREATE; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_CREATE;
@ -78,6 +82,7 @@ class DeltaGenerator final {
void DeleteVertex(memgraph::storage::Vertex *vertex) { void DeleteVertex(memgraph::storage::Vertex *vertex) {
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RecreateObjectTag()); memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RecreateObjectTag());
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{ {
memgraph::storage::durability::WalDeltaData data; memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_DELETE; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_DELETE;
@ -91,6 +96,7 @@ class DeltaGenerator final {
vertex->labels.push_back(label_id); vertex->labels.push_back(label_id);
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RemoveLabelTag(), memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::RemoveLabelTag(),
label_id); label_id);
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{ {
memgraph::storage::durability::WalDeltaData data; memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_ADD_LABEL; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_ADD_LABEL;
@ -104,6 +110,7 @@ class DeltaGenerator final {
auto label_id = memgraph::storage::LabelId::FromUint(gen_->mapper_.NameToId(label)); auto label_id = memgraph::storage::LabelId::FromUint(gen_->mapper_.NameToId(label));
vertex->labels.erase(std::find(vertex->labels.begin(), vertex->labels.end(), label_id)); vertex->labels.erase(std::find(vertex->labels.begin(), vertex->labels.end(), label_id));
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::AddLabelTag(), label_id); memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::AddLabelTag(), label_id);
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{ {
memgraph::storage::durability::WalDeltaData data; memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL;
@ -121,6 +128,7 @@ class DeltaGenerator final {
memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::SetPropertyTag(), memgraph::storage::CreateAndLinkDelta(&transaction_, &*vertex, memgraph::storage::Delta::SetPropertyTag(),
property_id, old_value); property_id, old_value);
props.SetProperty(property_id, value); props.SetProperty(property_id, value);
if (transaction_.storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) return;
{ {
memgraph::storage::durability::WalDeltaData data; memgraph::storage::durability::WalDeltaData data;
data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY; data.type = memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY;
@ -136,6 +144,7 @@ class DeltaGenerator final {
void Finalize(bool append_transaction_end = true) { void Finalize(bool append_transaction_end = true) {
auto commit_timestamp = gen_->timestamp_++; auto commit_timestamp = gen_->timestamp_++;
if (transaction_.deltas.empty()) return;
for (const auto &delta : transaction_.deltas) { for (const auto &delta : transaction_.deltas) {
auto owner = delta.prev.Get(); auto owner = delta.prev.Get();
while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) { while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) {
@ -183,12 +192,14 @@ class DeltaGenerator final {
using DataT = std::vector<std::pair<uint64_t, memgraph::storage::durability::WalDeltaData>>; using DataT = std::vector<std::pair<uint64_t, memgraph::storage::durability::WalDeltaData>>;
DeltaGenerator(const std::filesystem::path &data_directory, bool properties_on_edges, uint64_t seq_num) DeltaGenerator(const std::filesystem::path &data_directory, bool properties_on_edges, uint64_t seq_num,
memgraph::storage::StorageMode storage_mode = memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL)
: uuid_(memgraph::utils::GenerateUUID()), : uuid_(memgraph::utils::GenerateUUID()),
epoch_id_(memgraph::utils::GenerateUUID()), epoch_id_(memgraph::utils::GenerateUUID()),
seq_num_(seq_num), seq_num_(seq_num),
wal_file_(data_directory, uuid_, epoch_id_, {.properties_on_edges = properties_on_edges}, &mapper_, seq_num, wal_file_(data_directory, uuid_, epoch_id_, {.properties_on_edges = properties_on_edges}, &mapper_, seq_num,
&file_retainer_) {} &file_retainer_),
storage_mode_(storage_mode) {}
Transaction CreateTransaction() { return Transaction(this); } Transaction CreateTransaction() { return Transaction(this); }
@ -274,6 +285,8 @@ class DeltaGenerator final {
uint64_t valid_{true}; uint64_t valid_{true};
memgraph::utils::FileRetainer file_retainer_; memgraph::utils::FileRetainer file_retainer_;
memgraph::storage::StorageMode storage_mode_;
}; };
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) // NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
@ -621,3 +634,64 @@ TEST_P(WalFileTest, PartialData) {
ASSERT_EQ(pos, infos.size() - 2); ASSERT_EQ(pos, infos.size() - 2);
AssertWalInfoEqual(infos[infos.size() - 1].second, memgraph::storage::durability::ReadWalInfo(current_file)); AssertWalInfoEqual(infos[infos.size() - 1].second, memgraph::storage::durability::ReadWalInfo(current_file));
} }
class StorageModeWalFileTest : public ::testing::TestWithParam<memgraph::storage::StorageMode> {
public:
StorageModeWalFileTest() {}
void SetUp() override { Clear(); }
void TearDown() override { Clear(); }
std::vector<std::filesystem::path> GetFilesList() {
std::vector<std::filesystem::path> ret;
for (auto &item : std::filesystem::directory_iterator(storage_directory)) {
ret.push_back(item.path());
}
std::sort(ret.begin(), ret.end());
std::reverse(ret.begin(), ret.end());
return ret;
}
std::filesystem::path storage_directory{std::filesystem::temp_directory_path() / "MG_test_unit_storage_v2_wal_file"};
struct PrintStringParamToName {
std::string operator()(const testing::TestParamInfo<memgraph::storage::StorageMode> &info) {
return std::string(StorageModeToString(static_cast<memgraph::storage::StorageMode>(info.param)));
}
};
private:
void Clear() {
if (!std::filesystem::exists(storage_directory)) return;
std::filesystem::remove_all(storage_directory);
}
};
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(StorageModeWalFileTest, StorageModeData) {
std::vector<std::pair<uint64_t, memgraph::storage::durability::WalInfo>> infos;
const memgraph::storage::StorageMode storage_mode = GetParam();
{
DeltaGenerator gen(storage_directory, true, 5, storage_mode);
auto tx = gen.CreateTransaction();
tx.CreateVertex();
tx.Finalize(true);
infos.emplace_back(gen.GetPosition(), gen.GetInfo());
size_t num_expected_deltas = storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL ? 0 : 2;
ASSERT_EQ(infos[0].second.num_deltas, num_expected_deltas);
auto wal_files = GetFilesList();
size_t num_expected_wal_files = 1;
ASSERT_EQ(num_expected_wal_files, wal_files.size());
if (storage_mode == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) {
DeltaGenerator gen_empty(storage_directory, true, 5, storage_mode);
ASSERT_EQ(gen.GetPosition(), gen_empty.GetPosition());
}
}
}
INSTANTIATE_TEST_CASE_P(ParameterizedWalStorageModeTests, StorageModeWalFileTest, ::testing::ValuesIn(storage_modes),
StorageModeWalFileTest::PrintStringParamToName());