Add back the query memory limit logic (#134)
This commit is contained in:
parent
5c93f81881
commit
8de31092ad
@ -27,6 +27,7 @@
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
#include "utils/readable_size.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/tsc.hpp"
|
||||
|
||||
@ -604,7 +605,7 @@ struct PullPlanVector {
|
||||
struct PullPlan {
|
||||
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters ¶meters, bool is_profile_query,
|
||||
DbAccessor *dba, InterpreterContext *interpreter_context,
|
||||
utils::MonotonicBufferResource *execution_memory);
|
||||
utils::MonotonicBufferResource *execution_memory, std::optional<size_t> memory_limit = {});
|
||||
std::optional<ExecutionContext> Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
@ -614,6 +615,7 @@ struct PullPlan {
|
||||
plan::UniqueCursorPtr cursor_ = nullptr;
|
||||
Frame frame_;
|
||||
ExecutionContext ctx_;
|
||||
std::optional<size_t> memory_limit_;
|
||||
|
||||
// As it's possible to query execution using multiple pulls
|
||||
// we need the keep track of the total execution time across
|
||||
@ -631,10 +633,11 @@ struct PullPlan {
|
||||
|
||||
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters ¶meters, const bool is_profile_query,
|
||||
DbAccessor *dba, InterpreterContext *interpreter_context,
|
||||
utils::MonotonicBufferResource *execution_memory)
|
||||
utils::MonotonicBufferResource *execution_memory, const std::optional<size_t> memory_limit)
|
||||
: plan_(plan),
|
||||
cursor_(plan->plan().MakeCursor(execution_memory)),
|
||||
frame_(plan->symbol_table().max_position(), execution_memory) {
|
||||
frame_(plan->symbol_table().max_position(), execution_memory),
|
||||
memory_limit_(memory_limit) {
|
||||
ctx_.db_accessor = dba;
|
||||
ctx_.symbol_table = plan->symbol_table();
|
||||
ctx_.evaluation_context.timestamp =
|
||||
@ -657,21 +660,25 @@ std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<
|
||||
// single `Pull`.
|
||||
constexpr size_t stack_size = 256 * 1024;
|
||||
char stack_data[stack_size];
|
||||
utils::ResourceWithOutOfMemoryException resource_with_exception;
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
|
||||
// We can throw on every query because a simple queries for deleting will use only
|
||||
// the stack allocated buffer.
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
|
||||
|
||||
if (memory_limit_) {
|
||||
maybe_limited_resource.emplace(&pool_memory, *memory_limit_);
|
||||
ctx_.evaluation_context.memory = &*maybe_limited_resource;
|
||||
} else {
|
||||
ctx_.evaluation_context.memory = &pool_memory;
|
||||
}
|
||||
|
||||
// Returns true if a result was pulled.
|
||||
const auto pull_result = [&]() -> bool {
|
||||
// We can throw on every query because a simple queries for deleting will use only
|
||||
// the stack allocated buffer.
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
utils::ResourceWithOutOfMemoryException resource_with_exception;
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
ctx_.evaluation_context.memory = &pool_memory;
|
||||
|
||||
return cursor_->Pull(frame_, ctx_);
|
||||
};
|
||||
const auto pull_result = [&]() -> bool { return cursor_->Pull(frame_, ctx_); };
|
||||
|
||||
const auto stream_values = [&]() {
|
||||
// TODO: The streamed values should also probably use the above memory.
|
||||
@ -829,9 +836,23 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
|
||||
PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||
utils::MonotonicBufferResource *execution_memory) {
|
||||
auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage),
|
||||
utils::Downcast<CypherQuery>(parsed_query.query), parsed_query.parameters,
|
||||
&interpreter_context->plan_cache, dba, parsed_query.is_cacheable);
|
||||
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
|
||||
|
||||
Frame frame(0);
|
||||
SymbolTable symbol_table;
|
||||
EvaluationContext evaluation_context;
|
||||
evaluation_context.timestamp =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
evaluation_context.parameters = parsed_query.parameters;
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::View::OLD);
|
||||
const auto memory_limit = EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
||||
if (memory_limit) {
|
||||
spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit));
|
||||
}
|
||||
|
||||
auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query,
|
||||
parsed_query.parameters, &interpreter_context->plan_cache, dba);
|
||||
|
||||
summary->insert_or_assign("cost_estimate", plan->cost());
|
||||
auto rw_type_checker = plan::ReadWriteTypeChecker();
|
||||
@ -850,8 +871,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
|
||||
utils::FindOr(parsed_query.stripped_query.named_expressions(), symbol.token_position(), symbol.name()).first);
|
||||
}
|
||||
|
||||
auto pull_plan =
|
||||
std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context, execution_memory);
|
||||
auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
|
||||
execution_memory, memory_limit);
|
||||
return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
|
||||
[pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
|
||||
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
|
||||
@ -949,6 +970,15 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
||||
|
||||
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
|
||||
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in PROFILE");
|
||||
Frame frame(0);
|
||||
SymbolTable symbol_table;
|
||||
EvaluationContext evaluation_context;
|
||||
evaluation_context.timestamp =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
evaluation_context.parameters = parsed_inner_query.parameters;
|
||||
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::View::OLD);
|
||||
const auto memory_limit = EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
|
||||
|
||||
auto cypher_query_plan = CypherQueryToPlan(
|
||||
parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query,
|
||||
@ -960,14 +990,14 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
||||
{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"},
|
||||
std::move(parsed_query.required_privileges),
|
||||
[plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), summary, dba,
|
||||
interpreter_context, execution_memory,
|
||||
interpreter_context, execution_memory, memory_limit,
|
||||
// We want to execute the query we are profiling lazily, so we delay
|
||||
// the construction of the corresponding context.
|
||||
ctx = std::optional<ExecutionContext>{}, pull_plan = std::shared_ptr<PullPlanVector>(nullptr)](
|
||||
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
|
||||
// No output symbols are given so that nothing is streamed.
|
||||
if (!ctx) {
|
||||
ctx = PullPlan(plan, parameters, true, dba, interpreter_context, execution_memory)
|
||||
ctx = PullPlan(plan, parameters, true, dba, interpreter_context, execution_memory, memory_limit)
|
||||
.Pull(stream, {}, {}, summary);
|
||||
pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(ctx->stats, ctx->profile_execution_time));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user