Multiple isolation levels (#171)
This PR introduces READ COMMITTED and READ UNCOMMITTED isolation levels. The isolation level can be set with a config or with a query for different scopes.
This commit is contained in:
parent
90a093bd95
commit
8cd9f696cf
@ -87,6 +87,10 @@ modifications:
|
||||
value: "0"
|
||||
override: true
|
||||
|
||||
- name: "isolation_level"
|
||||
value: "SNAPSHOT_ISOLATION"
|
||||
override: true
|
||||
|
||||
undocumented:
|
||||
- "flag_file"
|
||||
- "also_log_to_stderr"
|
||||
|
@ -49,6 +49,8 @@ std::string PermissionToString(Permission permission) {
|
||||
return "FREE_MEMORY";
|
||||
case Permission::TRIGGER:
|
||||
return "TRIGGER";
|
||||
case Permission::CONFIG:
|
||||
return "CONFIG";
|
||||
case Permission::AUTH:
|
||||
return "AUTH";
|
||||
}
|
||||
|
@ -26,16 +26,18 @@ enum class Permission : uint64_t {
|
||||
READ_FILE = 1U << 12U,
|
||||
FREE_MEMORY = 1U << 13U,
|
||||
TRIGGER = 1U << 14U,
|
||||
CONFIG = 1U << 15U,
|
||||
AUTH = 1U << 16U
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
// Constant list of all available permissions.
|
||||
const std::vector<Permission> kPermissionsAll = {
|
||||
Permission::MATCH, Permission::CREATE, Permission::MERGE, Permission::DELETE,
|
||||
Permission::SET, Permission::REMOVE, Permission::INDEX, Permission::STATS,
|
||||
Permission::CONSTRAINT, Permission::DUMP, Permission::AUTH, Permission::REPLICATION,
|
||||
Permission::LOCK_PATH, Permission::READ_FILE, Permission::FREE_MEMORY, Permission::TRIGGER};
|
||||
const std::vector<Permission> kPermissionsAll = {Permission::MATCH, Permission::CREATE, Permission::MERGE,
|
||||
Permission::DELETE, Permission::SET, Permission::REMOVE,
|
||||
Permission::INDEX, Permission::STATS, Permission::CONSTRAINT,
|
||||
Permission::DUMP, Permission::AUTH, Permission::REPLICATION,
|
||||
Permission::LOCK_PATH, Permission::READ_FILE, Permission::FREE_MEMORY,
|
||||
Permission::TRIGGER, Permission::CONFIG};
|
||||
|
||||
// Function that converts a permission to its string representation.
|
||||
std::string PermissionToString(Permission permission);
|
||||
|
@ -34,6 +34,8 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) {
|
||||
return auth::Permission::FREE_MEMORY;
|
||||
case query::AuthQuery::Privilege::TRIGGER:
|
||||
return auth::Permission::TRIGGER;
|
||||
case query::AuthQuery::Privilege::CONFIG:
|
||||
return auth::Permission::CONFIG;
|
||||
case query::AuthQuery::Privilege::AUTH:
|
||||
return auth::Permission::AUTH;
|
||||
}
|
||||
|
175
src/memgraph.cpp
175
src/memgraph.cpp
@ -11,6 +11,7 @@
|
||||
#include <optional>
|
||||
#include <regex>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <thread>
|
||||
|
||||
#include <fmt/format.h>
|
||||
@ -28,6 +29,7 @@
|
||||
#include "query/procedure/module.hpp"
|
||||
#include "query/procedure/py_module.hpp"
|
||||
#include "requests/requests.hpp"
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
#include "telemetry/telemetry.hpp"
|
||||
@ -67,6 +69,42 @@
|
||||
#include "glue/auth.hpp"
|
||||
#endif
|
||||
|
||||
namespace {
|
||||
std::string GetAllowedEnumValuesString(const auto &mappings) {
|
||||
std::vector<std::string> allowed_values;
|
||||
allowed_values.reserve(mappings.size());
|
||||
std::transform(mappings.begin(), mappings.end(), std::back_inserter(allowed_values),
|
||||
[](const auto &mapping) { return std::string(mapping.first); });
|
||||
return utils::Join(allowed_values, ", ");
|
||||
}
|
||||
|
||||
enum class ValidationError : uint8_t { EmptyValue, InvalidValue };
|
||||
|
||||
utils::BasicResult<ValidationError> IsValidEnumValueString(const auto &value, const auto &mappings) {
|
||||
if (value.empty()) {
|
||||
return ValidationError::EmptyValue;
|
||||
}
|
||||
|
||||
if (std::find_if(mappings.begin(), mappings.end(), [&](const auto &mapping) { return mapping.first == value; }) ==
|
||||
mappings.cend()) {
|
||||
return ValidationError::InvalidValue;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
template <typename Enum>
|
||||
std::optional<Enum> StringToEnum(const auto &value, const auto &mappings) {
|
||||
const auto mapping_iter =
|
||||
std::find_if(mappings.begin(), mappings.end(), [&](const auto &mapping) { return mapping.first == value; });
|
||||
if (mapping_iter == mappings.cend()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return mapping_iter->second;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// Bolt server flags.
|
||||
DEFINE_string(bolt_address, "0.0.0.0", "IP address on which the Bolt server should listen.");
|
||||
DEFINE_VALIDATED_int32(bolt_port, 7687, "Port on which the Bolt server should listen.",
|
||||
@ -140,6 +178,72 @@ DEFINE_uint64(query_execution_timeout_sec, 180,
|
||||
"Maximum allowed query execution time. Queries exceeding this "
|
||||
"limit will be aborted. Value of 0 means no limit.");
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint64(
|
||||
memory_limit, 0,
|
||||
"Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap "
|
||||
"is enabled and 90\% of the physical memory otherwise.");
|
||||
|
||||
namespace {
|
||||
using namespace std::literals;
|
||||
constexpr std::array isolation_level_mappings{
|
||||
std::pair{"SNAPSHOT_ISOLATION"sv, storage::IsolationLevel::SNAPSHOT_ISOLATION},
|
||||
std::pair{"READ_COMMITTED"sv, storage::IsolationLevel::READ_COMMITTED},
|
||||
std::pair{"READ_UNCOMMITTED"sv, storage::IsolationLevel::READ_UNCOMMITTED}};
|
||||
|
||||
const std::string isolation_level_help_string =
|
||||
fmt::format("Default isolation level used for the transactions. Allowed values: {}",
|
||||
GetAllowedEnumValuesString(isolation_level_mappings));
|
||||
} // namespace
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_VALIDATED_string(isolation_level, "SNAPSHOT_ISOLATION", isolation_level_help_string.c_str(), {
|
||||
if (const auto result = IsValidEnumValueString(value, isolation_level_mappings); result.HasError()) {
|
||||
const auto error = result.GetError();
|
||||
switch (error) {
|
||||
case ValidationError::EmptyValue: {
|
||||
std::cout << "Isolation level cannot be empty." << std::endl;
|
||||
break;
|
||||
}
|
||||
case ValidationError::InvalidValue: {
|
||||
std::cout << "Invalid value for isolation level. Allowed values: "
|
||||
<< GetAllowedEnumValuesString(isolation_level_mappings) << std::endl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
namespace {
|
||||
storage::IsolationLevel ParseIsolationLevel() {
|
||||
const auto isolation_level = StringToEnum<storage::IsolationLevel>(FLAGS_isolation_level, isolation_level_mappings);
|
||||
MG_ASSERT(isolation_level, "Invalid isolation level");
|
||||
return *isolation_level;
|
||||
}
|
||||
|
||||
int64_t GetMemoryLimit() {
|
||||
if (FLAGS_memory_limit == 0) {
|
||||
auto maybe_total_memory = utils::sysinfo::TotalMemory();
|
||||
MG_ASSERT(maybe_total_memory, "Failed to fetch the total physical memory");
|
||||
const auto maybe_swap_memory = utils::sysinfo::SwapTotalMemory();
|
||||
MG_ASSERT(maybe_swap_memory, "Failed to fetch the total swap memory");
|
||||
|
||||
if (*maybe_swap_memory == 0) {
|
||||
// take only 90% of the total memory
|
||||
*maybe_total_memory *= 9;
|
||||
*maybe_total_memory /= 10;
|
||||
}
|
||||
return *maybe_total_memory * 1024;
|
||||
}
|
||||
|
||||
// We parse the memory as MiB every time
|
||||
return FLAGS_memory_limit * 1024 * 1024;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace {
|
||||
std::vector<std::filesystem::path> query_modules_directories;
|
||||
} // namespace
|
||||
@ -168,37 +272,30 @@ DEFINE_VALIDATED_string(query_modules_directory, "",
|
||||
DEFINE_bool(also_log_to_stderr, false, "Log messages go to stderr in addition to logfiles");
|
||||
DEFINE_string(log_file, "", "Path to where the log should be stored.");
|
||||
|
||||
DEFINE_uint64(
|
||||
memory_limit, 0,
|
||||
"Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap "
|
||||
"is enabled and 90\% of the physical memory otherwise.");
|
||||
namespace {
|
||||
constexpr std::array log_level_mappings{
|
||||
std::pair{"TRACE", spdlog::level::trace}, std::pair{"DEBUG", spdlog::level::debug},
|
||||
std::pair{"INFO", spdlog::level::info}, std::pair{"WARNING", spdlog::level::warn},
|
||||
std::pair{"ERROR", spdlog::level::err}, std::pair{"CRITICAL", spdlog::level::critical}};
|
||||
|
||||
std::string GetAllowedLogLevelsString() {
|
||||
std::vector<std::string> allowed_log_levels;
|
||||
allowed_log_levels.reserve(log_level_mappings.size());
|
||||
std::transform(log_level_mappings.cbegin(), log_level_mappings.cend(), std::back_inserter(allowed_log_levels),
|
||||
[](const auto &mapping) { return mapping.first; });
|
||||
return utils::Join(allowed_log_levels, ", ");
|
||||
}
|
||||
std::pair{"TRACE"sv, spdlog::level::trace}, std::pair{"DEBUG"sv, spdlog::level::debug},
|
||||
std::pair{"INFO"sv, spdlog::level::info}, std::pair{"WARNING"sv, spdlog::level::warn},
|
||||
std::pair{"ERROR"sv, spdlog::level::err}, std::pair{"CRITICAL"sv, spdlog::level::critical}};
|
||||
|
||||
const std::string log_level_help_string =
|
||||
fmt::format("Minimum log level. Allowed values: {}", GetAllowedLogLevelsString());
|
||||
fmt::format("Minimum log level. Allowed values: {}", GetAllowedEnumValuesString(log_level_mappings));
|
||||
} // namespace
|
||||
|
||||
DEFINE_VALIDATED_string(log_level, "WARNING", log_level_help_string.c_str(), {
|
||||
if (value.empty()) {
|
||||
std::cout << "Log level cannot be empty." << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (std::find_if(log_level_mappings.cbegin(), log_level_mappings.cend(),
|
||||
[&](const auto &mapping) { return mapping.first == value; }) == log_level_mappings.cend()) {
|
||||
std::cout << "Invalid value for log level. Allowed values: " << GetAllowedLogLevelsString() << std::endl;
|
||||
if (const auto result = IsValidEnumValueString(value, log_level_mappings); result.HasError()) {
|
||||
const auto error = result.GetError();
|
||||
switch (error) {
|
||||
case ValidationError::EmptyValue: {
|
||||
std::cout << "Log level cannot be empty." << std::endl;
|
||||
break;
|
||||
}
|
||||
case ValidationError::InvalidValue: {
|
||||
std::cout << "Invalid value for log level. Allowed values: " << GetAllowedEnumValuesString(log_level_mappings)
|
||||
<< std::endl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -207,11 +304,9 @@ DEFINE_VALIDATED_string(log_level, "WARNING", log_level_help_string.c_str(), {
|
||||
|
||||
namespace {
|
||||
void ParseLogLevel() {
|
||||
const auto mapping_iter = std::find_if(log_level_mappings.cbegin(), log_level_mappings.cend(),
|
||||
[](const auto &mapping) { return mapping.first == FLAGS_log_level; });
|
||||
MG_ASSERT(mapping_iter != log_level_mappings.cend(), "Invalid log level");
|
||||
|
||||
spdlog::set_level(mapping_iter->second);
|
||||
const auto log_level = StringToEnum<spdlog::level::level_enum>(FLAGS_log_level, log_level_mappings);
|
||||
MG_ASSERT(log_level, "Invalid log level");
|
||||
spdlog::set_level(*log_level);
|
||||
}
|
||||
|
||||
// 5 weeks * 7 days
|
||||
@ -241,25 +336,6 @@ void ConfigureLogging() {
|
||||
spdlog::flush_on(spdlog::level::trace);
|
||||
ParseLogLevel();
|
||||
}
|
||||
|
||||
int64_t GetMemoryLimit() {
|
||||
if (FLAGS_memory_limit == 0) {
|
||||
auto maybe_total_memory = utils::sysinfo::TotalMemory();
|
||||
MG_ASSERT(maybe_total_memory, "Failed to fetch the total physical memory");
|
||||
const auto maybe_swap_memory = utils::sysinfo::SwapTotalMemory();
|
||||
MG_ASSERT(maybe_swap_memory, "Failed to fetch the total swap memory");
|
||||
|
||||
if (*maybe_swap_memory == 0) {
|
||||
// take only 90% of the total memory
|
||||
*maybe_total_memory *= 9;
|
||||
*maybe_total_memory /= 10;
|
||||
}
|
||||
return *maybe_total_memory * 1024;
|
||||
}
|
||||
|
||||
// We parse the memory as MiB every time
|
||||
return FLAGS_memory_limit * 1024 * 1024;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
/// Encapsulates Dbms and Interpreter that are passed through the network server
|
||||
@ -962,7 +1038,8 @@ int main(int argc, char **argv) {
|
||||
.snapshot_retention_count = FLAGS_storage_snapshot_retention_count,
|
||||
.wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib,
|
||||
.wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx,
|
||||
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit}};
|
||||
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit},
|
||||
.transaction = {.isolation_level = ParseIsolationLevel()}};
|
||||
if (FLAGS_storage_snapshot_interval_sec == 0) {
|
||||
if (FLAGS_storage_wal_enabled) {
|
||||
LOG_FATAL(
|
||||
|
@ -175,4 +175,10 @@ class TriggerModificationInMulticommandTxException : public QueryException {
|
||||
TriggerModificationInMulticommandTxException()
|
||||
: QueryException("Trigger queries not allowed in multicommand transactions.") {}
|
||||
};
|
||||
|
||||
class IsolationLevelModificationInMulticommandTxException : public QueryException {
|
||||
public:
|
||||
IsolationLevelModificationInMulticommandTxException()
|
||||
: QueryException("Isolation level cannot be modified in multicommand transactions.") {}
|
||||
};
|
||||
} // namespace query
|
||||
|
@ -2193,7 +2193,7 @@ cpp<#
|
||||
(:serialize))
|
||||
(lcp:define-enum privilege
|
||||
(create delete match merge set remove index stats auth constraint
|
||||
dump replication lock_path read_file free_memory trigger)
|
||||
dump replication lock_path read_file free_memory trigger config)
|
||||
(:serialize))
|
||||
#>cpp
|
||||
AuthQuery() = default;
|
||||
@ -2232,7 +2232,8 @@ const std::vector<AuthQuery::Privilege> kPrivilegesAll = {
|
||||
AuthQuery::Privilege::REPLICATION,
|
||||
AuthQuery::Privilege::READ_FILE,
|
||||
AuthQuery::Privilege::LOCK_PATH,
|
||||
AuthQuery::Privilege::FREE_MEMORY, AuthQuery::Privilege::TRIGGER};
|
||||
AuthQuery::Privilege::FREE_MEMORY, AuthQuery::Privilege::TRIGGER,
|
||||
AuthQuery::Privilege::CONFIG};
|
||||
cpp<#
|
||||
|
||||
(lcp:define-class info-query (query)
|
||||
@ -2432,4 +2433,27 @@ cpp<#
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:define-class isolation-level-query (query)
|
||||
((isolation_level "IsolationLevel" :scope :public)
|
||||
(isolation_level_scope "IsolationLevelScope" :scope :public))
|
||||
|
||||
(:public
|
||||
(lcp:define-enum isolation-level
|
||||
(snapshot-isolation read-committed read-uncommitted)
|
||||
(:serialize))
|
||||
(lcp:define-enum isolation-level-scope
|
||||
(next session global)
|
||||
(:serialize))
|
||||
#>cpp
|
||||
IsolationLevelQuery() = default;
|
||||
|
||||
DEFVISITABLE(QueryVisitor<void>);
|
||||
cpp<#)
|
||||
(:private
|
||||
#>cpp
|
||||
friend class AstStorage;
|
||||
cpp<#)
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:pop-namespace) ;; namespace query
|
||||
|
@ -77,6 +77,7 @@ class LockPathQuery;
|
||||
class LoadCsv;
|
||||
class FreeMemoryQuery;
|
||||
class TriggerQuery;
|
||||
class IsolationLevelQuery;
|
||||
|
||||
using TreeCompositeVisitor = ::utils::CompositeVisitor<
|
||||
SingleQuery, CypherUnion, NamedExpression, OrOperator, XorOperator, AndOperator, NotOperator, AdditionOperator,
|
||||
@ -110,6 +111,6 @@ class ExpressionVisitor
|
||||
template <class TResult>
|
||||
class QueryVisitor : public ::utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery,
|
||||
InfoQuery, ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery,
|
||||
FreeMemoryQuery, TriggerQuery> {};
|
||||
FreeMemoryQuery, TriggerQuery, IsolationLevelQuery> {};
|
||||
|
||||
} // namespace query
|
||||
|
@ -412,6 +412,33 @@ antlrcpp::Any CypherMainVisitor::visitShowTriggers(MemgraphCypher::ShowTriggersC
|
||||
return trigger_query;
|
||||
}
|
||||
|
||||
antlrcpp::Any CypherMainVisitor::visitIsolationLevelQuery(MemgraphCypher::IsolationLevelQueryContext *ctx) {
|
||||
auto *isolation_level_query = storage_->Create<IsolationLevelQuery>();
|
||||
|
||||
isolation_level_query->isolation_level_scope_ = [scope = ctx->isolationLevelScope()]() {
|
||||
if (scope->GLOBAL()) {
|
||||
return IsolationLevelQuery::IsolationLevelScope::GLOBAL;
|
||||
}
|
||||
if (scope->SESSION()) {
|
||||
return IsolationLevelQuery::IsolationLevelScope::SESSION;
|
||||
}
|
||||
return IsolationLevelQuery::IsolationLevelScope::NEXT;
|
||||
}();
|
||||
|
||||
isolation_level_query->isolation_level_ = [level = ctx->isolationLevel()]() {
|
||||
if (level->SNAPSHOT()) {
|
||||
return IsolationLevelQuery::IsolationLevel::SNAPSHOT_ISOLATION;
|
||||
}
|
||||
if (level->COMMITTED()) {
|
||||
return IsolationLevelQuery::IsolationLevel::READ_COMMITTED;
|
||||
}
|
||||
return IsolationLevelQuery::IsolationLevel::READ_UNCOMMITTED;
|
||||
}();
|
||||
|
||||
query_ = isolation_level_query;
|
||||
return isolation_level_query;
|
||||
}
|
||||
|
||||
antlrcpp::Any CypherMainVisitor::visitCypherUnion(MemgraphCypher::CypherUnionContext *ctx) {
|
||||
bool distinct = !ctx->ALL();
|
||||
auto *cypher_union = storage_->Create<CypherUnion>(distinct);
|
||||
@ -844,6 +871,7 @@ antlrcpp::Any CypherMainVisitor::visitPrivilege(MemgraphCypher::PrivilegeContext
|
||||
if (ctx->READ_FILE()) return AuthQuery::Privilege::READ_FILE;
|
||||
if (ctx->FREE_MEMORY()) return AuthQuery::Privilege::FREE_MEMORY;
|
||||
if (ctx->TRIGGER()) return AuthQuery::Privilege::TRIGGER;
|
||||
if (ctx->CONFIG()) return AuthQuery::Privilege::CONFIG;
|
||||
LOG_FATAL("Should not get here - unknown privilege!");
|
||||
}
|
||||
|
||||
|
@ -238,6 +238,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
|
||||
*/
|
||||
antlrcpp::Any visitShowTriggers(MemgraphCypher::ShowTriggersContext *ctx) override;
|
||||
|
||||
/**
|
||||
* @return IsolationLevelQuery*
|
||||
*/
|
||||
antlrcpp::Any visitIsolationLevelQuery(MemgraphCypher::IsolationLevelQueryContext *ctx) override;
|
||||
|
||||
/**
|
||||
* @return CypherUnion*
|
||||
*/
|
||||
|
@ -14,8 +14,10 @@ memgraphCypherKeyword : cypherKeyword
|
||||
| BAD
|
||||
| BEFORE
|
||||
| CLEAR
|
||||
| CONFIG
|
||||
| CSV
|
||||
| COMMIT
|
||||
| COMMITTED
|
||||
| DATA
|
||||
| DELIMITER
|
||||
| DATABASE
|
||||
@ -26,17 +28,22 @@ memgraphCypherKeyword : cypherKeyword
|
||||
| FOR
|
||||
| FREE
|
||||
| FROM
|
||||
| GLOBAL
|
||||
| GRANT
|
||||
| HEADER
|
||||
| IDENTIFIED
|
||||
| ISOLATION
|
||||
| LEVEL
|
||||
| LOAD
|
||||
| LOCK
|
||||
| MAIN
|
||||
| MODE
|
||||
| NEXT
|
||||
| NO
|
||||
| PASSWORD
|
||||
| PORT
|
||||
| PRIVILEGES
|
||||
| READ
|
||||
| REGISTER
|
||||
| REPLICA
|
||||
| REPLICAS
|
||||
@ -45,12 +52,16 @@ memgraphCypherKeyword : cypherKeyword
|
||||
| ROLE
|
||||
| ROLES
|
||||
| QUOTE
|
||||
| SESSION
|
||||
| SNAPSHOT
|
||||
| STATS
|
||||
| SYNC
|
||||
| TRANSACTION
|
||||
| TRIGGER
|
||||
| TRIGGERS
|
||||
| TIMEOUT
|
||||
| TO
|
||||
| UNCOMMITTED
|
||||
| UNLOCK
|
||||
| UPDATE
|
||||
| USER
|
||||
@ -74,6 +85,7 @@ query : cypherQuery
|
||||
| lockPathQuery
|
||||
| freeMemoryQuery
|
||||
| triggerQuery
|
||||
| isolationLevelQuery
|
||||
;
|
||||
|
||||
authQuery : createRole
|
||||
@ -175,6 +187,7 @@ privilege : CREATE
|
||||
| READ_FILE
|
||||
| FREE_MEMORY
|
||||
| TRIGGER
|
||||
| CONFIG
|
||||
;
|
||||
|
||||
privilegeList : privilege ( ',' privilege )* ;
|
||||
@ -222,3 +235,9 @@ createTrigger : CREATE TRIGGER triggerName ( ON ( emptyVertex | emptyEdge ) ? (
|
||||
dropTrigger : DROP TRIGGER triggerName ;
|
||||
|
||||
showTriggers : SHOW TRIGGERS ;
|
||||
|
||||
isolationLevel : SNAPSHOT ISOLATION | READ COMMITTED | READ UNCOMMITTED ;
|
||||
|
||||
isolationLevelScope : GLOBAL | SESSION | NEXT ;
|
||||
|
||||
isolationLevelQuery : SET isolationLevelScope TRANSACTION ISOLATION LEVEL isolationLevel ;
|
||||
|
@ -20,6 +20,8 @@ BAD : B A D ;
|
||||
BEFORE : B E F O R E ;
|
||||
CLEAR : C L E A R ;
|
||||
COMMIT : C O M M I T ;
|
||||
COMMITTED : C O M M I T T E D ;
|
||||
CONFIG : C O N F I G ;
|
||||
CSV : C S V ;
|
||||
DATA : D A T A ;
|
||||
DELIMITER : D E L I M I T E R ;
|
||||
@ -33,20 +35,25 @@ FOR : F O R ;
|
||||
FREE : F R E E ;
|
||||
FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ;
|
||||
FROM : F R O M ;
|
||||
GLOBAL : G L O B A L ;
|
||||
GRANT : G R A N T ;
|
||||
GRANTS : G R A N T S ;
|
||||
HEADER : H E A D E R ;
|
||||
IDENTIFIED : I D E N T I F I E D ;
|
||||
IGNORE : I G N O R E ;
|
||||
ISOLATION : I S O L A T I O N ;
|
||||
LEVEL : L E V E L ;
|
||||
LOAD : L O A D ;
|
||||
LOCK : L O C K ;
|
||||
LOCK_PATH : L O C K UNDERSCORE P A T H ;
|
||||
MAIN : M A I N ;
|
||||
MODE : M O D E ;
|
||||
NEXT : N E X T ;
|
||||
NO : N O ;
|
||||
PASSWORD : P A S S W O R D ;
|
||||
PORT : P O R T ;
|
||||
PRIVILEGES : P R I V I L E G E S ;
|
||||
READ : R E A D ;
|
||||
READ_FILE : R E A D UNDERSCORE F I L E ;
|
||||
REGISTER : R E G I S T E R ;
|
||||
REPLICA : R E P L I C A ;
|
||||
@ -56,12 +63,16 @@ REVOKE : R E V O K E ;
|
||||
ROLE : R O L E ;
|
||||
ROLES : R O L E S ;
|
||||
QUOTE : Q U O T E ;
|
||||
SESSION : S E S S I O N ;
|
||||
SNAPSHOT : S N A P S H O T ;
|
||||
STATS : S T A T S ;
|
||||
SYNC : S Y N C ;
|
||||
TIMEOUT : T I M E O U T ;
|
||||
TO : T O ;
|
||||
TRANSACTION : T R A N S A C T I O N ;
|
||||
TRIGGER : T R I G G E R ;
|
||||
TRIGGERS : T R I G G E R S ;
|
||||
UNCOMMITTED : U N C O M M I T T E D ;
|
||||
UNLOCK : U N L O C K ;
|
||||
UPDATE : U P D A T E ;
|
||||
USER : U S E R ;
|
||||
|
@ -57,6 +57,8 @@ class PrivilegeExtractor : public QueryVisitor<void>, public HierarchicalTreeVis
|
||||
|
||||
void Visit(ReplicationQuery &replication_query) override { AddPrivilege(AuthQuery::Privilege::REPLICATION); }
|
||||
|
||||
void Visit(IsolationLevelQuery &isolation_level_query) override { AddPrivilege(AuthQuery::Privilege::CONFIG); }
|
||||
|
||||
bool PreVisit(Create & /*unused*/) override {
|
||||
AddPrivilege(AuthQuery::Privilege::CREATE);
|
||||
return false;
|
||||
|
@ -79,17 +79,19 @@ class Trie {
|
||||
const int kBitsetSize = 65536;
|
||||
|
||||
const trie::Trie kKeywords = {
|
||||
"union", "all", "optional", "match", "unwind", "as", "merge", "on",
|
||||
"create", "set", "detach", "delete", "remove", "with", "distinct", "return",
|
||||
"order", "by", "skip", "limit", "ascending", "asc", "descending", "desc",
|
||||
"where", "or", "xor", "and", "not", "in", "starts", "ends",
|
||||
"contains", "is", "null", "case", "when", "then", "else", "end",
|
||||
"count", "filter", "extract", "any", "none", "single", "true", "false",
|
||||
"reduce", "coalesce", "user", "password", "alter", "drop", "show", "stats",
|
||||
"unique", "explain", "profile", "storage", "index", "info", "exists", "assert",
|
||||
"constraint", "node", "key", "dump", "database", "call", "yield", "memory",
|
||||
"mb", "kb", "unlimited", "free", "procedure", "query", "free_memory", "read_file",
|
||||
"lock_path", "after", "before", "execute", "transaction", "trigger", "triggers", "update"};
|
||||
"union", "all", "optional", "match", "unwind", "as", "merge", "on",
|
||||
"create", "set", "detach", "delete", "remove", "with", "distinct", "return",
|
||||
"order", "by", "skip", "limit", "ascending", "asc", "descending", "desc",
|
||||
"where", "or", "xor", "and", "not", "in", "starts", "ends",
|
||||
"contains", "is", "null", "case", "when", "then", "else", "end",
|
||||
"count", "filter", "extract", "any", "none", "single", "true", "false",
|
||||
"reduce", "coalesce", "user", "password", "alter", "drop", "show", "stats",
|
||||
"unique", "explain", "profile", "storage", "index", "info", "exists", "assert",
|
||||
"constraint", "node", "key", "dump", "database", "call", "yield", "memory",
|
||||
"mb", "kb", "unlimited", "free", "procedure", "query", "free_memory", "read_file",
|
||||
"lock_path", "after", "before", "execute", "transaction", "trigger", "triggers", "update",
|
||||
"comitted", "uncomitted", "global", "isolation", "level", "next", "read", "session",
|
||||
"snapshot", "transaction"};
|
||||
|
||||
// Unicode codepoints that are allowed at the start of the unescaped name.
|
||||
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(
|
||||
|
@ -622,7 +622,8 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
|
||||
in_explicit_transaction_ = true;
|
||||
expect_rollback_ = false;
|
||||
|
||||
db_accessor_ = std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access());
|
||||
db_accessor_ =
|
||||
std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
|
||||
execution_db_accessor_.emplace(db_accessor_.get());
|
||||
|
||||
if (interpreter_context_->trigger_store->HasTriggers()) {
|
||||
@ -1162,6 +1163,50 @@ PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, const bool in_explic
|
||||
// NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks)
|
||||
}
|
||||
|
||||
constexpr auto ToStorageIsolationLevel(const IsolationLevelQuery::IsolationLevel isolation_level) noexcept {
|
||||
switch (isolation_level) {
|
||||
case IsolationLevelQuery::IsolationLevel::SNAPSHOT_ISOLATION:
|
||||
return storage::IsolationLevel::SNAPSHOT_ISOLATION;
|
||||
case IsolationLevelQuery::IsolationLevel::READ_COMMITTED:
|
||||
return storage::IsolationLevel::READ_COMMITTED;
|
||||
case IsolationLevelQuery::IsolationLevel::READ_UNCOMMITTED:
|
||||
return storage::IsolationLevel::READ_UNCOMMITTED;
|
||||
}
|
||||
}
|
||||
|
||||
PreparedQuery PrepareIsolationLevelQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||
InterpreterContext *interpreter_context, Interpreter *interpreter) {
|
||||
if (in_explicit_transaction) {
|
||||
throw IsolationLevelModificationInMulticommandTxException();
|
||||
}
|
||||
|
||||
auto *isolation_level_query = utils::Downcast<IsolationLevelQuery>(parsed_query.query);
|
||||
MG_ASSERT(isolation_level_query);
|
||||
|
||||
const auto isolation_level = ToStorageIsolationLevel(isolation_level_query->isolation_level_);
|
||||
|
||||
auto callback = [isolation_level_query, isolation_level, interpreter_context,
|
||||
interpreter]() -> std::function<void()> {
|
||||
switch (isolation_level_query->isolation_level_scope_) {
|
||||
case IsolationLevelQuery::IsolationLevelScope::GLOBAL:
|
||||
return [interpreter_context, isolation_level] { interpreter_context->db->SetIsolationLevel(isolation_level); };
|
||||
case IsolationLevelQuery::IsolationLevelScope::SESSION:
|
||||
return [interpreter, isolation_level] { interpreter->SetSessionIsolationLevel(isolation_level); };
|
||||
case IsolationLevelQuery::IsolationLevelScope::NEXT:
|
||||
return [interpreter, isolation_level] { interpreter->SetNextTransactionIsolationLevel(isolation_level); };
|
||||
}
|
||||
}();
|
||||
|
||||
return PreparedQuery{
|
||||
{},
|
||||
std::move(parsed_query.required_privileges),
|
||||
[callback = std::move(callback)](AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
|
||||
callback();
|
||||
return QueryHandlerResult::COMMIT;
|
||||
},
|
||||
RWType::NONE};
|
||||
}
|
||||
|
||||
PreparedQuery PrepareInfoQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
|
||||
storage::Storage *db, utils::MemoryResource *execution_memory) {
|
||||
@ -1452,7 +1497,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
(utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) ||
|
||||
utils::Downcast<ProfileQuery>(parsed_query.query) || utils::Downcast<DumpQuery>(parsed_query.query) ||
|
||||
utils::Downcast<TriggerQuery>(parsed_query.query))) {
|
||||
db_accessor_ = std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access());
|
||||
db_accessor_ =
|
||||
std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
|
||||
execution_db_accessor_.emplace(db_accessor_.get());
|
||||
|
||||
if (utils::Downcast<CypherQuery>(parsed_query.query) && interpreter_context_->trigger_store->HasTriggers()) {
|
||||
@ -1503,6 +1549,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
|
||||
} else if (utils::Downcast<TriggerQuery>(parsed_query.query)) {
|
||||
prepared_query = PrepareTriggerQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
|
||||
&*execution_db_accessor_, params);
|
||||
} else if (utils::Downcast<IsolationLevelQuery>(parsed_query.query)) {
|
||||
prepared_query =
|
||||
PrepareIsolationLevelQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, this);
|
||||
} else {
|
||||
LOG_FATAL("Should not get here -- unknown query type!");
|
||||
}
|
||||
@ -1690,4 +1739,22 @@ void Interpreter::AbortCommand(std::unique_ptr<QueryExecution> *query_execution)
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<storage::IsolationLevel> Interpreter::GetIsolationLevelOverride() {
|
||||
if (next_transaction_isolation_level) {
|
||||
const auto isolation_level = *next_transaction_isolation_level;
|
||||
next_transaction_isolation_level.reset();
|
||||
return isolation_level;
|
||||
}
|
||||
|
||||
return interpreter_isolation_level;
|
||||
}
|
||||
|
||||
void Interpreter::SetNextTransactionIsolationLevel(const storage::IsolationLevel isolation_level) {
|
||||
next_transaction_isolation_level.emplace(isolation_level);
|
||||
}
|
||||
|
||||
void Interpreter::SetSessionIsolationLevel(const storage::IsolationLevel isolation_level) {
|
||||
interpreter_isolation_level.emplace(isolation_level);
|
||||
}
|
||||
|
||||
} // namespace query
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "query/stream.hpp"
|
||||
#include "query/trigger.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "utils/event_counter.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
@ -254,6 +255,9 @@ class Interpreter final {
|
||||
|
||||
void RollbackTransaction();
|
||||
|
||||
void SetNextTransactionIsolationLevel(storage::IsolationLevel isolation_level);
|
||||
void SetSessionIsolationLevel(storage::IsolationLevel isolation_level);
|
||||
|
||||
/**
|
||||
* Abort the current multicommand transaction.
|
||||
*/
|
||||
@ -306,10 +310,14 @@ class Interpreter final {
|
||||
bool in_explicit_transaction_{false};
|
||||
bool expect_rollback_{false};
|
||||
|
||||
std::optional<storage::IsolationLevel> interpreter_isolation_level;
|
||||
std::optional<storage::IsolationLevel> next_transaction_isolation_level;
|
||||
|
||||
PreparedQuery PrepareTransactionQuery(std::string_view query_upper);
|
||||
void Commit();
|
||||
void AdvanceCommand();
|
||||
void AbortCommand(std::unique_ptr<QueryExecution> *query_execution);
|
||||
std::optional<storage::IsolationLevel> GetIsolationLevelOverride();
|
||||
|
||||
size_t ActiveQueryExecutions() {
|
||||
return std::count_if(query_executions_.begin(), query_executions_.end(),
|
||||
|
@ -3,6 +3,8 @@
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <filesystem>
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -38,6 +40,10 @@ struct Config {
|
||||
bool snapshot_on_exit{false};
|
||||
|
||||
} durability;
|
||||
|
||||
struct Transaction {
|
||||
IsolationLevel isolation_level{IsolationLevel::SNAPSHOT_ISOLATION};
|
||||
} transaction;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
9
src/storage/v2/isolation_level.hpp
Normal file
9
src/storage/v2/isolation_level.hpp
Normal file
@ -0,0 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace storage {
|
||||
|
||||
enum class IsolationLevel : std::uint8_t { SNAPSHOT_ISOLATION, READ_COMMITTED, READ_UNCOMMITTED };
|
||||
|
||||
} // namespace storage
|
@ -24,8 +24,18 @@ inline void ApplyDeltasForRead(Transaction *transaction, const Delta *delta, Vie
|
||||
auto ts = delta->timestamp->load(std::memory_order_acquire);
|
||||
auto cid = delta->command_id;
|
||||
|
||||
// This is a committed change that we see so we shouldn't undo it.
|
||||
if (ts < transaction->start_timestamp) {
|
||||
// For SNAPSHOT ISOLATION -> we can only see the changes which were committed before the start of the current
|
||||
// transaction
|
||||
//
|
||||
// For READ COMMITTED -> we can only see the changes which are committed. Commit timestamps of
|
||||
// uncommitted changes are set to the transaction id of the transaction that made the change. Transaction id is
|
||||
// always higher than start or commit timestamps so we know if the timestamp is lower than the initial transaction
|
||||
// id value, that the change is committed.
|
||||
//
|
||||
// For READ UNCOMMITTED -> we accept any change.
|
||||
if ((transaction->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION && ts < transaction->start_timestamp) ||
|
||||
(transaction->isolation_level == IsolationLevel::READ_COMMITTED && ts < kTransactionInitialId) ||
|
||||
(transaction->isolation_level == IsolationLevel::READ_UNCOMMITTED)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -286,6 +286,7 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
|
||||
|
||||
Storage::Storage(Config config)
|
||||
: indices_(&constraints_, config.items),
|
||||
isolation_level_(config.transaction.isolation_level),
|
||||
config_(config),
|
||||
snapshot_directory_(config_.durability.storage_directory / durability::kSnapshotDirectory),
|
||||
wal_directory_(config_.durability.storage_directory / durability::kWalDirectory),
|
||||
@ -394,13 +395,13 @@ Storage::~Storage() {
|
||||
}
|
||||
}
|
||||
|
||||
Storage::Accessor::Accessor(Storage *storage)
|
||||
Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level)
|
||||
: storage_(storage),
|
||||
// The lock must be acquired before creating the transaction object to
|
||||
// prevent freshly created transactions from dangling in an active state
|
||||
// during exclusive operations.
|
||||
storage_guard_(storage_->main_lock_),
|
||||
transaction_(storage->CreateTransaction()),
|
||||
transaction_(storage->CreateTransaction(isolation_level)),
|
||||
is_transaction_active_(true),
|
||||
config_(storage->config_.items) {}
|
||||
|
||||
@ -1227,7 +1228,7 @@ VerticesIterable Storage::Accessor::Vertices(LabelId label, PropertyId property,
|
||||
storage_->indices_.label_property_index.Vertices(label, property, lower_bound, upper_bound, view, &transaction_));
|
||||
}
|
||||
|
||||
Transaction Storage::CreateTransaction() {
|
||||
Transaction Storage::CreateTransaction(IsolationLevel isolation_level) {
|
||||
// We acquire the transaction engine lock here because we access (and
|
||||
// modify) the transaction engine variables (`transaction_id` and
|
||||
// `timestamp`) below.
|
||||
@ -1248,7 +1249,7 @@ Transaction Storage::CreateTransaction() {
|
||||
start_timestamp = timestamp_++;
|
||||
}
|
||||
}
|
||||
return {transaction_id, start_timestamp};
|
||||
return {transaction_id, start_timestamp, isolation_level};
|
||||
}
|
||||
|
||||
template <bool force>
|
||||
@ -1736,7 +1737,7 @@ void Storage::CreateSnapshot() {
|
||||
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
|
||||
// Create the transaction used to create the snapshot.
|
||||
auto transaction = CreateTransaction();
|
||||
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION);
|
||||
|
||||
// Create snapshot.
|
||||
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
|
||||
@ -1894,4 +1895,9 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
|
||||
});
|
||||
}
|
||||
|
||||
void Storage::SetIsolationLevel(IsolationLevel isolation_level) {
|
||||
std::unique_lock main_guard{main_lock_};
|
||||
isolation_level_ = isolation_level;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/indices.hpp"
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/name_id_mapper.hpp"
|
||||
#include "storage/v2/result.hpp"
|
||||
@ -184,7 +185,7 @@ class Storage final {
|
||||
private:
|
||||
friend class Storage;
|
||||
|
||||
explicit Accessor(Storage *storage);
|
||||
explicit Accessor(Storage *storage, IsolationLevel isolation_level);
|
||||
|
||||
public:
|
||||
Accessor(const Accessor &) = delete;
|
||||
@ -322,7 +323,9 @@ class Storage final {
|
||||
Config::Items config_;
|
||||
};
|
||||
|
||||
Accessor Access() { return Accessor{this}; }
|
||||
Accessor Access(std::optional<IsolationLevel> override_isolation_level = {}) {
|
||||
return Accessor{this, override_isolation_level.value_or(isolation_level_)};
|
||||
}
|
||||
|
||||
const std::string &LabelToName(LabelId label) const;
|
||||
const std::string &PropertyToName(PropertyId property) const;
|
||||
@ -423,8 +426,10 @@ class Storage final {
|
||||
|
||||
void FreeMemory();
|
||||
|
||||
void SetIsolationLevel(IsolationLevel isolation_level);
|
||||
|
||||
private:
|
||||
Transaction CreateTransaction();
|
||||
Transaction CreateTransaction(IsolationLevel isolation_level);
|
||||
|
||||
/// The force parameter determines the behaviour of the garbage collector.
|
||||
/// If it's set to true, it will behave as a global operation, i.e. it can't
|
||||
@ -485,6 +490,7 @@ class Storage final {
|
||||
std::optional<CommitLog> commit_log_;
|
||||
|
||||
utils::Synchronized<std::list<Transaction>, utils::SpinLock> committed_transactions_;
|
||||
IsolationLevel isolation_level_;
|
||||
|
||||
Config config_;
|
||||
utils::Scheduler gc_runner_;
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
@ -19,8 +20,12 @@ const uint64_t kTimestampInitialId = 0;
|
||||
const uint64_t kTransactionInitialId = 1ULL << 63U;
|
||||
|
||||
struct Transaction {
|
||||
Transaction(uint64_t transaction_id, uint64_t start_timestamp)
|
||||
: transaction_id(transaction_id), start_timestamp(start_timestamp), command_id(0), must_abort(false) {}
|
||||
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level)
|
||||
: transaction_id(transaction_id),
|
||||
start_timestamp(start_timestamp),
|
||||
command_id(0),
|
||||
must_abort(false),
|
||||
isolation_level(isolation_level) {}
|
||||
|
||||
Transaction(Transaction &&other) noexcept
|
||||
: transaction_id(other.transaction_id),
|
||||
@ -28,7 +33,8 @@ struct Transaction {
|
||||
commit_timestamp(std::move(other.commit_timestamp)),
|
||||
command_id(other.command_id),
|
||||
deltas(std::move(other.deltas)),
|
||||
must_abort(other.must_abort) {}
|
||||
must_abort(other.must_abort),
|
||||
isolation_level(other.isolation_level) {}
|
||||
|
||||
Transaction(const Transaction &) = delete;
|
||||
Transaction &operator=(const Transaction &) = delete;
|
||||
@ -52,6 +58,7 @@ struct Transaction {
|
||||
uint64_t command_id;
|
||||
std::list<Delta> deltas;
|
||||
bool must_abort;
|
||||
IsolationLevel isolation_level;
|
||||
};
|
||||
|
||||
inline bool operator==(const Transaction &first, const Transaction &second) {
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "communication/result_stream_faker.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
|
||||
class ExpansionBenchFixture : public benchmark::Fixture {
|
||||
|
@ -1,3 +1,4 @@
|
||||
add_subdirectory(replication)
|
||||
add_subdirectory(memory)
|
||||
add_subdirectory(triggers)
|
||||
add_subdirectory(isolation_levels)
|
||||
|
2
tests/e2e/isolation_levels/CMakeLists.txt
Normal file
2
tests/e2e/isolation_levels/CMakeLists.txt
Normal file
@ -0,0 +1,2 @@
|
||||
add_executable(memgraph__e2e__isolation_levels isolation_levels.cpp)
|
||||
target_link_libraries(memgraph__e2e__isolation_levels gflags mgclient mg-utils mg-io Threads::Threads)
|
241
tests/e2e/isolation_levels/isolation_levels.cpp
Normal file
241
tests/e2e/isolation_levels/isolation_levels.cpp
Normal file
@ -0,0 +1,241 @@
|
||||
#include <gflags/gflags.h>
|
||||
#include <mgclient.hpp>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_uint64(bolt_port, 7687, "Bolt port");
|
||||
DEFINE_uint64(timeout, 120, "Timeout seconds");
|
||||
|
||||
namespace {
|
||||
|
||||
auto GetClient() {
|
||||
auto client =
|
||||
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
|
||||
MG_ASSERT(client, "Failed to connect!");
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
auto GetVertexCount(std::unique_ptr<mg::Client> &client) {
|
||||
MG_ASSERT(client->Execute("MATCH (n) RETURN count(n)"));
|
||||
auto maybe_row = client->FetchOne();
|
||||
MG_ASSERT(maybe_row, "Failed to fetch vertex count");
|
||||
|
||||
const auto &row = *maybe_row;
|
||||
MG_ASSERT(row.size() == 1, "Got invalid result for vertex count");
|
||||
|
||||
client->FetchOne();
|
||||
return row[0].ValueInt();
|
||||
}
|
||||
|
||||
void CleanDatabase() {
|
||||
auto client = GetClient();
|
||||
MG_ASSERT(client->Execute("MATCH (n) DETACH DELETE n;"));
|
||||
client->DiscardAll();
|
||||
}
|
||||
|
||||
void TestSnapshotIsolation(std::unique_ptr<mg::Client> &client) {
|
||||
spdlog::info("Verifying SNAPSHOT ISOLATION");
|
||||
|
||||
auto creator = GetClient();
|
||||
|
||||
MG_ASSERT(client->BeginTransaction());
|
||||
MG_ASSERT(creator->BeginTransaction());
|
||||
|
||||
constexpr auto vertex_count = 10;
|
||||
for (size_t i = 0; i < vertex_count; ++i) {
|
||||
MG_ASSERT(creator->Execute("CREATE ()"));
|
||||
creator->DiscardAll();
|
||||
|
||||
auto current_vertex_count = GetVertexCount(client);
|
||||
MG_ASSERT(current_vertex_count == 0,
|
||||
"Invalid number of vertices found for SNAPSHOT ISOLATION (found {}, expected {}). Read vertices from a "
|
||||
"transaction which started "
|
||||
"at a later point.",
|
||||
current_vertex_count, 0);
|
||||
}
|
||||
|
||||
MG_ASSERT(creator->CommitTransaction());
|
||||
|
||||
auto current_vertex_count = GetVertexCount(client);
|
||||
MG_ASSERT(current_vertex_count == 0,
|
||||
"Invalid number of vertices found for SNAPSHOT ISOLATION (found {}, expected {}). Read vertices from a "
|
||||
"transaction which started "
|
||||
"at a later point.",
|
||||
current_vertex_count, 0);
|
||||
MG_ASSERT(client->CommitTransaction());
|
||||
CleanDatabase();
|
||||
}
|
||||
|
||||
void TestReadCommitted(std::unique_ptr<mg::Client> &client) {
|
||||
spdlog::info("Verifying READ COMMITTED");
|
||||
|
||||
auto creator = GetClient();
|
||||
|
||||
MG_ASSERT(client->BeginTransaction());
|
||||
MG_ASSERT(creator->BeginTransaction());
|
||||
|
||||
constexpr auto vertex_count = 10;
|
||||
for (size_t i = 0; i < vertex_count; ++i) {
|
||||
MG_ASSERT(creator->Execute("CREATE ()"));
|
||||
creator->DiscardAll();
|
||||
|
||||
auto current_vertex_count = GetVertexCount(client);
|
||||
MG_ASSERT(current_vertex_count == 0,
|
||||
"Invalid number of vertices found for READ COMMITTED (found {}, expected {}. Read vertices from a "
|
||||
"transaction which is not "
|
||||
"committed.",
|
||||
current_vertex_count, 0);
|
||||
}
|
||||
|
||||
MG_ASSERT(creator->CommitTransaction());
|
||||
|
||||
auto current_vertex_count = GetVertexCount(client);
|
||||
MG_ASSERT(current_vertex_count == vertex_count,
|
||||
"Invalid number of vertices found for READ COMMITTED (found {}, expected {}). Failed to read vertices "
|
||||
"from a committed transaction",
|
||||
current_vertex_count, vertex_count);
|
||||
MG_ASSERT(client->CommitTransaction());
|
||||
CleanDatabase();
|
||||
}
|
||||
|
||||
void TestReadUncommitted(std::unique_ptr<mg::Client> &client) {
|
||||
spdlog::info("Verifying READ UNCOMMITTED");
|
||||
|
||||
auto creator = GetClient();
|
||||
|
||||
MG_ASSERT(client->BeginTransaction());
|
||||
MG_ASSERT(creator->BeginTransaction());
|
||||
|
||||
constexpr auto vertex_count = 10;
|
||||
for (size_t i = 1; i <= vertex_count; ++i) {
|
||||
MG_ASSERT(creator->Execute("CREATE ()"));
|
||||
creator->DiscardAll();
|
||||
|
||||
auto current_vertex_count = GetVertexCount(client);
|
||||
MG_ASSERT(current_vertex_count == i,
|
||||
"Invalid number of vertices found for READ UNCOMMITTED (found {}, expected {}). Failed to read vertices "
|
||||
"from a different transaction.",
|
||||
current_vertex_count, i);
|
||||
}
|
||||
|
||||
MG_ASSERT(creator->CommitTransaction());
|
||||
|
||||
auto current_vertex_count = GetVertexCount(client);
|
||||
MG_ASSERT(current_vertex_count == vertex_count,
|
||||
"Invalid number of vertices found for READ UNCOMMITTED (found {}, expected {}). Failed to read vertices "
|
||||
"from a different transaction",
|
||||
current_vertex_count, vertex_count);
|
||||
MG_ASSERT(client->CommitTransaction());
|
||||
CleanDatabase();
|
||||
}
|
||||
|
||||
constexpr std::array isolation_levels{std::pair{"SNAPSHOT ISOLATION", &TestSnapshotIsolation},
|
||||
std::pair{"READ COMMITTED", &TestReadCommitted},
|
||||
std::pair{"READ UNCOMMITTED", &TestReadUncommitted}};
|
||||
|
||||
void TestGlobalIsolationLevel() {
|
||||
spdlog::info("\n\n----Test global isolation levels----\n");
|
||||
auto first_client = GetClient();
|
||||
auto second_client = GetClient();
|
||||
|
||||
for (const auto &[isolation_level, verification_function] : isolation_levels) {
|
||||
spdlog::info("--------------------------");
|
||||
spdlog::info("Setting global isolation level to {}", isolation_level);
|
||||
MG_ASSERT(first_client->Execute(fmt::format("SET GLOBAL TRANSACTION ISOLATION LEVEL {}", isolation_level)));
|
||||
first_client->DiscardAll();
|
||||
|
||||
verification_function(first_client);
|
||||
verification_function(second_client);
|
||||
spdlog::info("--------------------------\n");
|
||||
}
|
||||
}
|
||||
|
||||
void TestSessionIsolationLevel() {
|
||||
spdlog::info("\n\n----Test session isolation levels----\n");
|
||||
|
||||
auto global_client = GetClient();
|
||||
auto session_client = GetClient();
|
||||
for (const auto &[global_isolation_level, global_verification_function] : isolation_levels) {
|
||||
spdlog::info("Setting global isolation level to {}", global_isolation_level);
|
||||
MG_ASSERT(global_client->Execute(fmt::format("SET GLOBAL TRANSACTION ISOLATION LEVEL {}", global_isolation_level)));
|
||||
global_client->DiscardAll();
|
||||
|
||||
for (const auto &[session_isolation_level, session_verification_function] : isolation_levels) {
|
||||
spdlog::info("--------------------------");
|
||||
spdlog::info("Setting session isolation level to {}", session_isolation_level);
|
||||
MG_ASSERT(
|
||||
session_client->Execute(fmt::format("SET SESSION TRANSACTION ISOLATION LEVEL {}", session_isolation_level)));
|
||||
session_client->DiscardAll();
|
||||
|
||||
spdlog::info("Verifying client which is using global isolation level");
|
||||
global_verification_function(global_client);
|
||||
spdlog::info("Verifying client which is using session isolation level");
|
||||
session_verification_function(session_client);
|
||||
spdlog::info("--------------------------\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Priority of applying the isolation level from highest priority NEXT -> SESSION -> GLOBAL
|
||||
void TestNextIsolationLevel() {
|
||||
spdlog::info("\n\n----Test next isolation levels----\n");
|
||||
|
||||
auto global_client = GetClient();
|
||||
auto session_client = GetClient();
|
||||
for (const auto &[global_isolation_level, global_verification_function] : isolation_levels) {
|
||||
spdlog::info("Setting global isolation level to {}", global_isolation_level);
|
||||
MG_ASSERT(global_client->Execute(fmt::format("SET GLOBAL TRANSACTION ISOLATION LEVEL {}", global_isolation_level)));
|
||||
global_client->DiscardAll();
|
||||
|
||||
for (const auto &[session_isolation_level, session_verification_function] : isolation_levels) {
|
||||
spdlog::info("Setting session isolation level to {}", session_isolation_level);
|
||||
MG_ASSERT(
|
||||
session_client->Execute(fmt::format("SET SESSION TRANSACTION ISOLATION LEVEL {}", session_isolation_level)));
|
||||
session_client->DiscardAll();
|
||||
|
||||
for (const auto &[next_isolation_level, next_verification_function] : isolation_levels) {
|
||||
spdlog::info("--------------------------");
|
||||
spdlog::info("Verifying client which is using global isolation level");
|
||||
global_verification_function(global_client);
|
||||
spdlog::info("Verifying client which is using session isolation level");
|
||||
session_verification_function(session_client);
|
||||
|
||||
spdlog::info("Setting isolation level of the next transaction to {}", next_isolation_level);
|
||||
MG_ASSERT(global_client->Execute(fmt::format("SET NEXT TRANSACTION ISOLATION LEVEL {}", next_isolation_level)));
|
||||
global_client->DiscardAll();
|
||||
MG_ASSERT(
|
||||
session_client->Execute(fmt::format("SET NEXT TRANSACTION ISOLATION LEVEL {}", next_isolation_level)));
|
||||
session_client->DiscardAll();
|
||||
|
||||
spdlog::info("Verifying client which is using global isolation level while next isolation level is set");
|
||||
next_verification_function(global_client);
|
||||
spdlog::info("Verifying client which is using session isolation level while next isolation level is set");
|
||||
next_verification_function(session_client);
|
||||
|
||||
spdlog::info("Verifying client which is using global isolation level after the next isolation level was used");
|
||||
global_verification_function(global_client);
|
||||
spdlog::info("Verifying client which is using session isolation level after the next isolation level was used");
|
||||
session_verification_function(session_client);
|
||||
spdlog::info("--------------------------\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Isolation Levels");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
logging::RedirectToStderr();
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
TestGlobalIsolationLevel();
|
||||
TestSessionIsolationLevel();
|
||||
TestNextIsolationLevel();
|
||||
|
||||
return 0;
|
||||
}
|
14
tests/e2e/isolation_levels/workloads.yaml
Normal file
14
tests/e2e/isolation_levels/workloads.yaml
Normal file
@ -0,0 +1,14 @@
|
||||
bolt_port: &bolt_port "7687"
|
||||
template_cluster: &template_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: ["--bolt-port", *bolt_port, "--log-level=TRACE"]
|
||||
log_file: "isolation-levels-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
workloads:
|
||||
- name: "Isolation levels"
|
||||
binary: "tests/e2e/isolation_levels/memgraph__e2e__isolation_levels"
|
||||
args: ["--bolt-port", *bolt_port]
|
||||
<<: *template_cluster
|
@ -1,5 +1,6 @@
|
||||
#include "communication/result_stream_faker.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
|
||||
|
@ -279,6 +279,9 @@ target_link_libraries(${test_prefix}storage_v2_wal_file mg-storage-v2 fmt)
|
||||
add_unit_test(storage_v2_replication.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
|
||||
|
||||
add_unit_test(storage_v2_isolation_level.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2)
|
||||
|
||||
# Test mg-auth
|
||||
|
||||
if (MG_ENTERPRISE)
|
||||
|
@ -2065,6 +2065,8 @@ TEST_P(CypherMainVisitorTest, GrantPrivilege) {
|
||||
{AuthQuery::Privilege::FREE_MEMORY});
|
||||
check_auth_query(&ast_generator, "GRANT TRIGGER TO user", AuthQuery::Action::GRANT_PRIVILEGE, "", "", "user", {},
|
||||
{AuthQuery::Privilege::TRIGGER});
|
||||
check_auth_query(&ast_generator, "GRANT CONFIG TO user", AuthQuery::Action::GRANT_PRIVILEGE, "", "", "user", {},
|
||||
{AuthQuery::Privilege::CONFIG});
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, DenyPrivilege) {
|
||||
@ -3129,20 +3131,20 @@ TEST_P(CypherMainVisitorTest, CreateTriggers) {
|
||||
|
||||
const auto *query_template = "CREATE TRIGGER trigger {} {} COMMIT EXECUTE {}";
|
||||
|
||||
std::array events{std::pair{"", query::TriggerQuery::EventType::ANY},
|
||||
std::pair{"ON CREATE", query::TriggerQuery::EventType::CREATE},
|
||||
std::pair{"ON () CREATE", query::TriggerQuery::EventType::VERTEX_CREATE},
|
||||
std::pair{"ON --> CREATE", query::TriggerQuery::EventType::EDGE_CREATE},
|
||||
std::pair{"ON DELETE", query::TriggerQuery::EventType::DELETE},
|
||||
std::pair{"ON () DELETE", query::TriggerQuery::EventType::VERTEX_DELETE},
|
||||
std::pair{"ON --> DELETE", query::TriggerQuery::EventType::EDGE_DELETE},
|
||||
std::pair{"ON UPDATE", query::TriggerQuery::EventType::UPDATE},
|
||||
std::pair{"ON () UPDATE", query::TriggerQuery::EventType::VERTEX_UPDATE},
|
||||
std::pair{"ON --> UPDATE", query::TriggerQuery::EventType::EDGE_UPDATE}};
|
||||
constexpr std::array events{std::pair{"", query::TriggerQuery::EventType::ANY},
|
||||
std::pair{"ON CREATE", query::TriggerQuery::EventType::CREATE},
|
||||
std::pair{"ON () CREATE", query::TriggerQuery::EventType::VERTEX_CREATE},
|
||||
std::pair{"ON --> CREATE", query::TriggerQuery::EventType::EDGE_CREATE},
|
||||
std::pair{"ON DELETE", query::TriggerQuery::EventType::DELETE},
|
||||
std::pair{"ON () DELETE", query::TriggerQuery::EventType::VERTEX_DELETE},
|
||||
std::pair{"ON --> DELETE", query::TriggerQuery::EventType::EDGE_DELETE},
|
||||
std::pair{"ON UPDATE", query::TriggerQuery::EventType::UPDATE},
|
||||
std::pair{"ON () UPDATE", query::TriggerQuery::EventType::VERTEX_UPDATE},
|
||||
std::pair{"ON --> UPDATE", query::TriggerQuery::EventType::EDGE_UPDATE}};
|
||||
|
||||
std::array phases{"BEFORE", "AFTER"};
|
||||
constexpr std::array phases{"BEFORE", "AFTER"};
|
||||
|
||||
std::array statements{
|
||||
constexpr std::array statements{
|
||||
"", "SOME SUPER\nSTATEMENT", "Statement with 12312321 3 ", " Statement with 12312321 3 "
|
||||
|
||||
};
|
||||
@ -3157,4 +3159,43 @@ TEST_P(CypherMainVisitorTest, CreateTriggers) {
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
void ValidateSetIsolationLevelQuery(Base &ast_generator, const auto &query, const auto scope,
|
||||
const auto isolation_level) {
|
||||
auto *parsed_query = dynamic_cast<IsolationLevelQuery *>(ast_generator.ParseQuery(query));
|
||||
EXPECT_EQ(parsed_query->isolation_level_scope_, scope);
|
||||
EXPECT_EQ(parsed_query->isolation_level_, isolation_level);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_P(CypherMainVisitorTest, SetIsolationLevelQuery) {
|
||||
auto &ast_generator = *GetParam();
|
||||
TestInvalidQuery("SET ISO", ast_generator);
|
||||
TestInvalidQuery("SET TRANSACTION ISOLATION", ast_generator);
|
||||
TestInvalidQuery("SET TRANSACTION ISOLATION LEVEL", ast_generator);
|
||||
TestInvalidQuery("SET TRANSACTION ISOLATION LEVEL READ COMMITTED", ast_generator);
|
||||
TestInvalidQuery("SET NEXT TRANSACTION ISOLATION LEVEL", ast_generator);
|
||||
TestInvalidQuery("SET ISOLATION LEVEL READ COMMITTED", ast_generator);
|
||||
TestInvalidQuery("SET GLOBAL ISOLATION LEVEL READ COMMITTED", ast_generator);
|
||||
TestInvalidQuery("SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMITTED", ast_generator);
|
||||
TestInvalidQuery("SET GLOBAL TRANSACTION ISOLATION LEVEL READ_COMITTED", ast_generator);
|
||||
TestInvalidQuery("SET SESSION TRANSACTION ISOLATION LEVEL READCOMITTED", ast_generator);
|
||||
|
||||
constexpr std::array scopes{std::pair{"GLOBAL", query::IsolationLevelQuery::IsolationLevelScope::GLOBAL},
|
||||
std::pair{"SESSION", query::IsolationLevelQuery::IsolationLevelScope::SESSION},
|
||||
std::pair{"NEXT", query::IsolationLevelQuery::IsolationLevelScope::NEXT}};
|
||||
constexpr std::array isolation_levels{
|
||||
std::pair{"READ UNCOMMITTED", query::IsolationLevelQuery::IsolationLevel::READ_UNCOMMITTED},
|
||||
std::pair{"READ COMMITTED", query::IsolationLevelQuery::IsolationLevel::READ_COMMITTED},
|
||||
std::pair{"SNAPSHOT ISOLATION", query::IsolationLevelQuery::IsolationLevel::SNAPSHOT_ISOLATION}};
|
||||
|
||||
constexpr const auto *query_template = "SET {} TRANSACTION ISOLATION LEVEL {}";
|
||||
|
||||
for (const auto &[scope_string, scope] : scopes) {
|
||||
for (const auto &[isolation_level_string, isolation_level] : isolation_levels) {
|
||||
ValidateSetIsolationLevelQuery(ast_generator, fmt::format(query_template, scope_string, isolation_level_string),
|
||||
scope, isolation_level);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "query/stream.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "query_common.hpp"
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "utils/csv_parsing.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
@ -154,3 +154,8 @@ TEST_F(TestPrivilegeExtractor, TriggerQuery) {
|
||||
auto *query = storage.Create<TriggerQuery>();
|
||||
EXPECT_THAT(GetRequiredPrivileges(query), UnorderedElementsAre(AuthQuery::Privilege::TRIGGER));
|
||||
}
|
||||
|
||||
TEST_F(TestPrivilegeExtractor, SetIsolationLevelQuery) {
|
||||
auto *query = storage.Create<IsolationLevelQuery>();
|
||||
EXPECT_THAT(GetRequiredPrivileges(query), UnorderedElementsAre(AuthQuery::Privilege::CONFIG));
|
||||
}
|
||||
|
98
tests/unit/storage_v2_isolation_level.cpp
Normal file
98
tests/unit/storage_v2_isolation_level.cpp
Normal file
@ -0,0 +1,98 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "storage/v2/isolation_level.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
|
||||
namespace {
|
||||
int64_t VerticesCount(storage::Storage::Accessor &accessor) {
|
||||
int64_t count{0};
|
||||
for ([[maybe_unused]] const auto &vertex : accessor.Vertices(storage::View::NEW)) {
|
||||
++count;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
constexpr std::array isolation_levels{storage::IsolationLevel::SNAPSHOT_ISOLATION,
|
||||
storage::IsolationLevel::READ_COMMITTED,
|
||||
storage::IsolationLevel::READ_UNCOMMITTED};
|
||||
|
||||
std::string_view IsolationLevelToString(const storage::IsolationLevel isolation_level) {
|
||||
switch (isolation_level) {
|
||||
case storage::IsolationLevel::SNAPSHOT_ISOLATION:
|
||||
return "SNAPSHOT_ISOLATION";
|
||||
case storage::IsolationLevel::READ_COMMITTED:
|
||||
return "READ_COMMITTED";
|
||||
case storage::IsolationLevel::READ_UNCOMMITTED:
|
||||
return "READ_UNCOMMITTED";
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
class StorageIsolationLevelTest : public ::testing::TestWithParam<storage::IsolationLevel> {
|
||||
public:
|
||||
struct PrintToStringParamName {
|
||||
std::string operator()(const testing::TestParamInfo<storage::IsolationLevel> &info) {
|
||||
return std::string(IsolationLevelToString(static_cast<storage::IsolationLevel>(info.param)));
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
TEST_P(StorageIsolationLevelTest, Visibility) {
|
||||
const auto default_isolation_level = GetParam();
|
||||
|
||||
for (const auto override_isolation_level : isolation_levels) {
|
||||
storage::Storage storage{storage::Config{.transaction = {.isolation_level = default_isolation_level}}};
|
||||
auto creator = storage.Access();
|
||||
auto default_isolation_level_reader = storage.Access();
|
||||
auto override_isolation_level_reader = storage.Access(override_isolation_level);
|
||||
|
||||
ASSERT_EQ(VerticesCount(default_isolation_level_reader), 0);
|
||||
ASSERT_EQ(VerticesCount(override_isolation_level_reader), 0);
|
||||
|
||||
constexpr auto iteration_count = 10;
|
||||
{
|
||||
SCOPED_TRACE(fmt::format(
|
||||
"Visibility while the creator transaction is active "
|
||||
"(default isolation level = {}, override isolation level = {})",
|
||||
IsolationLevelToString(default_isolation_level), IsolationLevelToString(override_isolation_level)));
|
||||
for (size_t i = 1; i <= iteration_count; ++i) {
|
||||
creator.CreateVertex();
|
||||
|
||||
const auto check_vertices_count = [i](auto &accessor, const auto isolation_level) {
|
||||
const auto expected_count = isolation_level == storage::IsolationLevel::READ_UNCOMMITTED ? i : 0;
|
||||
EXPECT_EQ(VerticesCount(accessor), expected_count);
|
||||
};
|
||||
check_vertices_count(default_isolation_level_reader, default_isolation_level);
|
||||
check_vertices_count(override_isolation_level_reader, override_isolation_level);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_FALSE(creator.Commit().HasError());
|
||||
{
|
||||
SCOPED_TRACE(fmt::format(
|
||||
"Visibility after the creator transaction is committed "
|
||||
"(default isolation level = {}, override isolation level = {})",
|
||||
IsolationLevelToString(default_isolation_level), IsolationLevelToString(override_isolation_level)));
|
||||
const auto check_vertices_count = [iteration_count](auto &accessor, const auto isolation_level) {
|
||||
const auto expected_count =
|
||||
isolation_level == storage::IsolationLevel::SNAPSHOT_ISOLATION ? 0 : iteration_count;
|
||||
ASSERT_EQ(VerticesCount(accessor), expected_count);
|
||||
};
|
||||
|
||||
check_vertices_count(default_isolation_level_reader, default_isolation_level);
|
||||
check_vertices_count(override_isolation_level_reader, override_isolation_level);
|
||||
}
|
||||
|
||||
ASSERT_FALSE(default_isolation_level_reader.Commit().HasError());
|
||||
ASSERT_FALSE(override_isolation_level_reader.Commit().HasError());
|
||||
|
||||
SCOPED_TRACE("Visibility after a new transaction is started");
|
||||
auto verifier = storage.Access();
|
||||
ASSERT_EQ(VerticesCount(verifier), iteration_count);
|
||||
ASSERT_FALSE(verifier.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(ParameterizedStorageIsolationLevelTests, StorageIsolationLevelTest,
|
||||
::testing::ValuesIn(isolation_levels), StorageIsolationLevelTest::PrintToStringParamName());
|
@ -45,7 +45,9 @@ class DeltaGenerator final {
|
||||
private:
|
||||
friend class DeltaGenerator;
|
||||
|
||||
explicit Transaction(DeltaGenerator *gen) : gen_(gen), transaction_(gen->transaction_id_++, gen->timestamp_++) {}
|
||||
explicit Transaction(DeltaGenerator *gen)
|
||||
: gen_(gen),
|
||||
transaction_(gen->transaction_id_++, gen->timestamp_++, storage::IsolationLevel::SNAPSHOT_ISOLATION) {}
|
||||
|
||||
public:
|
||||
storage::Vertex *CreateVertex() {
|
||||
|
Loading…
Reference in New Issue
Block a user