Add memory limit clause for cypher queries (#106)

* Add memory limit clause for cypher queries

* Add PROCEDURE and QUERY keywords

* Improve memory limit logs

* Update CHANGELOG
This commit is contained in:
antonio2368 2021-03-16 09:05:38 +01:00 committed by Antonio Andelic
parent 9c6bf4b1b8
commit ad4c80af13
12 changed files with 216 additions and 45 deletions

View File

@ -2,6 +2,11 @@
## Future
### Breaking Changes
* Changed `MEMORY LIMIT num (KB|MB)` clause in the procedure calls to `PROCEDURE MEMORY LIMIT num (KB|MB)`.
The functionality is still the same.
### Major Feature and Improvements
* Added replication to community version.
@ -15,6 +20,8 @@
memgraph can allocate during its runtime.
* Added `FREE MEMORY` query which tries to free unusued memory chunks in different parts of storage.
* Added the memory limit and amount of currently allocated bytes in the result of `SHOW STORAGE INFO` query.
* Added `QUERY MEMORY LIMIT num (KB|MB)` to Cypher queries which allows you to limit memory allocation for
the entire query. It can be added only at the end of the entire Cypher query.
### Bug Fixes

View File

@ -17,6 +17,7 @@ set(mg_query_sources
frontend/semantic/symbol_generator.cpp
frontend/stripped.cpp
interpret/awesome_memgraph_functions.cpp
interpret/eval.cpp
interpreter.cpp
plan/operator.cpp
plan/preprocess.cpp

View File

@ -1543,7 +1543,11 @@ cpp<#
:scope :public
:slk-save #'slk-save-ast-vector
:slk-load (slk-load-ast-vector "CypherUnion")
:documentation "Contains remaining queries that should form and union with `single_query_`."))
:documentation "Contains remaining queries that should form and union with `single_query_`.")
(memory-limit "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(memory-scale "size_t" :initval "1024U" :scope :public))
(:public
#>cpp
CypherQuery() = default;

View File

@ -33,6 +33,28 @@ namespace query::frontend {
const std::string CypherMainVisitor::kAnonPrefix = "anon";
namespace {
template <typename TVisitor>
std::optional<std::pair<query::Expression *, size_t>> VisitMemoryLimit(
MemgraphCypher::MemoryLimitContext *memory_limit_ctx, TVisitor *visitor) {
MG_ASSERT(memory_limit_ctx);
if (memory_limit_ctx->UNLIMITED()) {
return std::nullopt;
}
auto memory_limit = memory_limit_ctx->literal()->accept(visitor);
size_t memory_scale = 1024U;
if (memory_limit_ctx->MB()) {
memory_scale = 1024U * 1024U;
} else {
MG_ASSERT(memory_limit_ctx->KB());
memory_scale = 1024U;
}
return std::make_pair(memory_limit, memory_scale);
}
} // namespace
antlrcpp::Any CypherMainVisitor::visitExplainQuery(MemgraphCypher::ExplainQueryContext *ctx) {
MG_ASSERT(ctx->children.size() == 2, "ExplainQuery should have exactly two children!");
auto *cypher_query = ctx->children[1]->accept(this).as<CypherQuery *>();
@ -127,6 +149,14 @@ antlrcpp::Any CypherMainVisitor::visitCypherQuery(MemgraphCypher::CypherQueryCon
cypher_query->cypher_unions_.push_back(child->accept(this).as<CypherUnion *>());
}
if (auto *memory_limit_ctx = ctx->queryMemoryLimit()) {
const auto memory_limit_info = VisitMemoryLimit(memory_limit_ctx->memoryLimit(), this);
if (memory_limit_info) {
cypher_query->memory_limit_ = memory_limit_info->first;
cypher_query->memory_scale_ = memory_limit_info->second;
}
}
query_ = cypher_query;
return cypher_query;
}
@ -489,21 +519,19 @@ antlrcpp::Any CypherMainVisitor::visitCallProcedure(MemgraphCypher::CallProcedur
for (auto *expr : ctx->expression()) {
call_proc->arguments_.push_back(expr->accept(this));
}
if (auto *memory_limit_ctx = ctx->callProcedureMemoryLimit()) {
if (memory_limit_ctx->LIMIT()) {
call_proc->memory_limit_ = memory_limit_ctx->literal()->accept(this);
if (memory_limit_ctx->MB()) {
call_proc->memory_scale_ = 1024U * 1024U;
} else {
MG_ASSERT(memory_limit_ctx->KB());
call_proc->memory_scale_ = 1024U;
}
if (auto *memory_limit_ctx = ctx->procedureMemoryLimit()) {
const auto memory_limit_info = VisitMemoryLimit(memory_limit_ctx->memoryLimit(), this);
if (memory_limit_info) {
call_proc->memory_limit_ = memory_limit_info->first;
call_proc->memory_scale_ = memory_limit_info->second;
}
} else {
// Default to 100 MB
call_proc->memory_limit_ = storage_->Create<PrimitiveLiteral>(TypedValue(100));
call_proc->memory_scale_ = 1024U * 1024U;
}
auto *yield_ctx = ctx->yieldProcedureResults();
if (!yield_ctx) {
const auto &maybe_found =

View File

@ -52,7 +52,7 @@ explainQuery : EXPLAIN cypherQuery ;
profileQuery : PROFILE cypherQuery ;
cypherQuery : singleQuery ( cypherUnion )* ;
cypherQuery : singleQuery ( cypherUnion )* ( queryMemoryLimit )? ;
indexQuery : createIndex | dropIndex;
@ -106,14 +106,18 @@ with : WITH ( DISTINCT )? returnBody ( where )? ;
cypherReturn : RETURN ( DISTINCT )? returnBody ;
callProcedure : CALL procedureName '(' ( expression ( ',' expression )* )? ')' ( callProcedureMemoryLimit )? ( yieldProcedureResults )? ;
callProcedure : CALL procedureName '(' ( expression ( ',' expression )* )? ')' ( procedureMemoryLimit )? ( yieldProcedureResults )? ;
procedureName : symbolicName ( '.' symbolicName )* ;
callProcedureMemoryLimit : MEMORY ( UNLIMITED | LIMIT literal ( MB | KB ) ) ;
yieldProcedureResults : YIELD ( '*' | ( procedureResult ( ',' procedureResult )* ) ) ;
memoryLimit : MEMORY ( UNLIMITED | LIMIT literal ( MB | KB ) ) ;
queryMemoryLimit : QUERY memoryLimit ;
procedureMemoryLimit : PROCEDURE memoryLimit ;
procedureResult : ( variable AS variable ) | variable ;
returnBody : returnItems ( order )? ( skip )? ( limit )? ;
@ -357,7 +361,9 @@ cypherKeyword : ALL
| OPTIONAL
| OR
| ORDER
| PROCEDURE
| PROFILE
| QUERY
| REDUCE
| REMOVE
| RETURN

View File

@ -118,7 +118,9 @@ ON : O N ;
OPTIONAL : O P T I O N A L ;
OR : O R ;
ORDER : O R D E R ;
PROCEDURE : P R O C E D U R E ;
PROFILE : P R O F I L E ;
QUERY : Q U E R Y ;
REDUCE : R E D U C E ;
REMOVE : R E M O V E ;
RETURN : R E T U R N ;

View File

@ -168,7 +168,8 @@ StrippedQuery::StrippedQuery(const std::string &query) : original_(query) {
for (;
jt != tokens.end() && (jt->second != "," || num_open_braces || num_open_parantheses || num_open_brackets) &&
!utils::IEquals(jt->second, "order") && !utils::IEquals(jt->second, "skip") &&
!utils::IEquals(jt->second, "limit") && !utils::IEquals(jt->second, "union") && jt->second != ";";
!utils::IEquals(jt->second, "limit") && !utils::IEquals(jt->second, "union") &&
!utils::IEquals(jt->second, "query") && jt->second != ";";
++jt) {
if (jt->second == "(") {
++num_open_parantheses;

View File

@ -87,7 +87,7 @@ const trie::Trie kKeywords = {
"single", "true", "false", "reduce", "coalesce", "user", "password", "alter", "drop",
"show", "stats", "unique", "explain", "profile", "storage", "index", "info", "exists",
"assert", "constraint", "node", "key", "dump", "database", "call", "yield", "memory",
"mb", "kb", "unlimited", "free"};
"mb", "kb", "unlimited", "free", "procedure", "query"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -0,0 +1,24 @@
#include "query/interpret/eval.hpp"
namespace query {
int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, const std::string &what) {
TypedValue value = expr->Accept(*evaluator);
try {
return value.ValueInt();
} catch (TypedValueException &e) {
throw QueryRuntimeException(what + " must be an int");
}
}
std::optional<size_t> EvaluateMemoryLimit(ExpressionEvaluator *eval, Expression *memory_limit, size_t memory_scale) {
if (!memory_limit) return std::nullopt;
auto limit_value = memory_limit->Accept(*eval);
if (!limit_value.IsInt() || limit_value.ValueInt() <= 0)
throw QueryRuntimeException("Memory limit must be a non-negative integer.");
size_t limit = limit_value.ValueInt();
if (std::numeric_limits<size_t>::max() / memory_scale < limit) throw QueryRuntimeException("Memory limit overflow.");
return limit * memory_scale;
}
} // namespace query

View File

@ -656,13 +656,8 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
/// @param what - Name of what's getting evaluated. Used for user feedback (via
/// exception) when the evaluated value is not an int.
/// @throw QueryRuntimeException if expression doesn't evaluate to an int.
inline int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, const std::string &what) {
TypedValue value = expr->Accept(*evaluator);
try {
return value.ValueInt();
} catch (TypedValueException &e) {
throw QueryRuntimeException(what + " must be an int");
}
}
int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, const std::string &what);
std::optional<size_t> EvaluateMemoryLimit(ExpressionEvaluator *eval, Expression *memory_limit, size_t memory_scale);
} // namespace query

View File

@ -33,6 +33,7 @@
#include "utils/pmr/unordered_map.hpp"
#include "utils/pmr/unordered_set.hpp"
#include "utils/pmr/vector.hpp"
#include "utils/readable_size.hpp"
#include "utils/string.hpp"
// macro for the default implementation of LogicalOperator::Accept
@ -3486,16 +3487,6 @@ std::unordered_map<std::string, int64_t> CallProcedure::GetAndResetCounters() {
namespace {
std::optional<size_t> EvalMemoryLimit(ExpressionEvaluator *eval, Expression *memory_limit, size_t memory_scale) {
if (!memory_limit) return std::nullopt;
auto limit_value = memory_limit->Accept(*eval);
if (!limit_value.IsInt() || limit_value.ValueInt() <= 0)
throw QueryRuntimeException("Memory limit must be a non-negative integer.");
size_t limit = limit_value.ValueInt();
if (std::numeric_limits<size_t>::max() / memory_scale < limit) throw QueryRuntimeException("Memory limit overflow.");
return limit * memory_scale;
}
void CallCustomProcedure(const std::string_view &fully_qualified_procedure_name, const mgp_proc &proc,
const std::vector<Expression *> &args, const mgp_graph &graph, ExpressionEvaluator *evaluator,
utils::MemoryResource *memory, std::optional<size_t> memory_limit, mgp_result *result) {
@ -3545,7 +3536,8 @@ void CallCustomProcedure(const std::string_view &fully_qualified_procedure_name,
proc_args.elems.emplace_back(std::get<2>(proc.opt_args[i]), &graph);
}
if (memory_limit) {
SPDLOG_INFO("Running '{}' with memory limit of {} bytes", fully_qualified_procedure_name, *memory_limit);
SPDLOG_INFO("Running '{}' with memory limit of {}", fully_qualified_procedure_name,
utils::GetReadableSize(*memory_limit));
utils::LimitedMemoryResource limited_mem(memory, *memory_limit);
mgp_memory proc_memory{&limited_mem};
MG_ASSERT(result->signature == &proc.results);
@ -3624,7 +3616,7 @@ class CallProcedureCursor : public Cursor {
// TODO: This will probably need to be changed when we add support for
// generator like procedures which yield a new result on each invocation.
auto *memory = context.evaluation_context.memory;
auto memory_limit = EvalMemoryLimit(&evaluator, self_->memory_limit_, self_->memory_scale_);
auto memory_limit = EvaluateMemoryLimit(&evaluator, self_->memory_limit_, self_->memory_scale_);
mgp_graph graph{context.db_accessor, graph_view, &context};
CallCustomProcedure(self_->procedure_name_, *proc, self_->arguments_, graph, &evaluator, memory, memory_limit,
&result_);

View File

@ -18,6 +18,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/opencypher/parser.hpp"
@ -2730,7 +2731,8 @@ TEST_P(CypherMainVisitorTest, CallWithoutYield) {
TEST_P(CypherMainVisitorTest, CallWithMemoryLimitWithoutYield) {
auto &ast_generator = *GetParam();
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL mg.load_all() MEMORY LIMIT 32 KB"));
auto *query =
dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL mg.load_all() PROCEDURE MEMORY LIMIT 32 KB"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
@ -2747,7 +2749,7 @@ TEST_P(CypherMainVisitorTest, CallWithMemoryLimitWithoutYield) {
TEST_P(CypherMainVisitorTest, CallWithMemoryUnlimitedWithoutYield) {
auto &ast_generator = *GetParam();
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL mg.load_all() MEMORY UNLIMITED"));
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL mg.load_all() PROCEDURE MEMORY UNLIMITED"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
@ -2763,8 +2765,8 @@ TEST_P(CypherMainVisitorTest, CallWithMemoryUnlimitedWithoutYield) {
TEST_P(CypherMainVisitorTest, CallProcedureWithMemoryLimit) {
auto &ast_generator = *GetParam();
auto *query =
dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL proc.with.dots() MEMORY LIMIT 32 MB YIELD res"));
auto *query = dynamic_cast<CypherQuery *>(
ast_generator.ParseQuery("CALL proc.with.dots() PROCEDURE MEMORY LIMIT 32 MB YIELD res"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
@ -2788,8 +2790,8 @@ TEST_P(CypherMainVisitorTest, CallProcedureWithMemoryLimit) {
TEST_P(CypherMainVisitorTest, CallProcedureWithMemoryUnlimited) {
auto &ast_generator = *GetParam();
auto *query =
dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL proc.with.dots() MEMORY UNLIMITED YIELD res"));
auto *query = dynamic_cast<CypherQuery *>(
ast_generator.ParseQuery("CALL proc.with.dots() PROCEDURE MEMORY UNLIMITED YIELD res"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
@ -2822,12 +2824,10 @@ TEST_P(CypherMainVisitorTest, IncorrectCallProcedure) {
ASSERT_THROW(ast_generator.ParseQuery("RETURN 42, CALL procedure() YIELD"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN 42, CALL procedure() YIELD res"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN 42 AS x CALL procedure() YIELD res"), SemanticException);
ASSERT_THROW(ast_generator.ParseQuery("CALL proc.with.dots() YIELD res MEMORY UNLIMITED"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("CALL proc.with.dots() YIELD res MEMORY LIMIT 32 KB"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("CALL proc.with.dots() MEMORY YIELD res"), SyntaxException);
// mg.procedures returns something, so it needs to have a YIELD.
ASSERT_THROW(ast_generator.ParseQuery("CALL mg.procedures()"), SemanticException);
ASSERT_THROW(ast_generator.ParseQuery("CALL mg.procedures() MEMORY UNLIMITED"), SemanticException);
ASSERT_THROW(ast_generator.ParseQuery("CALL mg.procedures() PROCEDURE MEMORY UNLIMITED"), SemanticException);
// TODO: Implement support for the following syntax. These are defined in
// Neo4j and accepted in openCypher CIP.
ASSERT_THROW(ast_generator.ParseQuery("CALL proc"), SyntaxException);
@ -2935,4 +2935,115 @@ TEST_P(CypherMainVisitorTest, TestLoadCsvClause) {
}
}
TEST_P(CypherMainVisitorTest, MemoryLimit) {
auto &ast_generator = *GetParam();
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUE"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUERY"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUERY MEM"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUERY MEMORY"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUERY MEMORY LIM"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUERY MEMORY LIMIT"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUERY MEMORY LIMIT KB"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("RETURN x QUERY MEMORY LIMIT 12GB"), SyntaxException);
ASSERT_THROW(ast_generator.ParseQuery("QUERY MEMORY LIMIT 12KB RETURN x"), SyntaxException);
{
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("RETURN x"));
ASSERT_TRUE(query);
ASSERT_FALSE(query->memory_limit_);
}
{
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("RETURN x QUERY MEMORY LIMIT 12KB"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->memory_limit_);
ast_generator.CheckLiteral(query->memory_limit_, 12);
ASSERT_EQ(query->memory_scale_, 1024U);
}
{
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("RETURN x QUERY MEMORY LIMIT 12MB"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->memory_limit_);
ast_generator.CheckLiteral(query->memory_limit_, 12);
ASSERT_EQ(query->memory_scale_, 1024U * 1024U);
}
{
auto *query = dynamic_cast<CypherQuery *>(
ast_generator.ParseQuery("CALL mg.procedures() YIELD x RETURN x QUERY MEMORY LIMIT 12MB"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->memory_limit_);
ast_generator.CheckLiteral(query->memory_limit_, 12);
ASSERT_EQ(query->memory_scale_, 1024U * 1024U);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 2U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
CheckCallProcedureDefaultMemoryLimit(ast_generator, *call_proc);
}
{
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery(
"CALL mg.procedures() PROCEDURE MEMORY LIMIT 3KB YIELD x RETURN x QUERY MEMORY LIMIT 12MB"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->memory_limit_);
ast_generator.CheckLiteral(query->memory_limit_, 12);
ASSERT_EQ(query->memory_scale_, 1024U * 1024U);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 2U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
ASSERT_TRUE(call_proc->memory_limit_);
ast_generator.CheckLiteral(call_proc->memory_limit_, 3);
ASSERT_EQ(call_proc->memory_scale_, 1024U);
}
{
auto *query = dynamic_cast<CypherQuery *>(
ast_generator.ParseQuery("CALL mg.procedures() PROCEDURE MEMORY LIMIT 3KB YIELD x RETURN x"));
ASSERT_TRUE(query);
ASSERT_FALSE(query->memory_limit_);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 2U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
ASSERT_TRUE(call_proc->memory_limit_);
ast_generator.CheckLiteral(call_proc->memory_limit_, 3);
ASSERT_EQ(call_proc->memory_scale_, 1024U);
}
{
auto *query =
dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL mg.load_all() PROCEDURE MEMORY LIMIT 3KB"));
ASSERT_TRUE(query);
ASSERT_FALSE(query->memory_limit_);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 1U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
ASSERT_TRUE(call_proc->memory_limit_);
ast_generator.CheckLiteral(call_proc->memory_limit_, 3);
ASSERT_EQ(call_proc->memory_scale_, 1024U);
}
{
auto *query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery("CALL mg.load_all() QUERY MEMORY LIMIT 3KB"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->memory_limit_);
ast_generator.CheckLiteral(query->memory_limit_, 3);
ASSERT_EQ(query->memory_scale_, 1024U);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 1U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
CheckCallProcedureDefaultMemoryLimit(ast_generator, *call_proc);
}
}
} // namespace