Change EvalContext and QueryExecution to use PoolResource on LOAD CSV (#825)

* Change PullPlan to use specific PoolResource for LOAD CSV
This commit is contained in:
Antonio Filipovic 2023-04-04 16:54:08 +02:00 committed by GitHub
parent 9fc51f74a0
commit a586f2f98d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 112 additions and 44 deletions

View File

@ -20,6 +20,7 @@
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <optional>
#include <thread>
#include <unordered_map>
@ -67,6 +68,7 @@
#include "utils/settings.hpp"
#include "utils/string.hpp"
#include "utils/tsc.hpp"
#include "utils/typeinfo.hpp"
#include "utils/variant_helpers.hpp"
namespace EventCounter {
@ -981,7 +983,8 @@ struct PullPlan {
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
TriggerContextCollector *trigger_context_collector = nullptr,
std::optional<size_t> memory_limit = {});
std::optional<size_t> memory_limit = {}, bool use_monotonic_memory = true);
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
const std::vector<Symbol> &output_symbols,
std::map<std::string, TypedValue> *summary);
@ -1005,16 +1008,25 @@ struct PullPlan {
// we have to keep track of any unsent results from previous `PullPlan::Pull`
// manually by using this flag.
bool has_unsent_results_ = false;
// In the case of LOAD CSV, we want to use only PoolResource without MonotonicMemoryResource
// to reuse allocated memory. As LOAD CSV is processing row by row
// it is possible to reduce memory usage significantly if MemoryResource deals with memory allocation
// can reuse memory that was allocated on processing the first row on all subsequent rows.
// This flag signals to `PullPlan::Pull` which MemoryResource to use
bool use_monotonic_memory_;
};
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit)
TriggerContextCollector *trigger_context_collector, const std::optional<size_t> memory_limit,
bool use_monotonic_memory)
: plan_(plan),
cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory),
memory_limit_(memory_limit) {
memory_limit_(memory_limit),
use_monotonic_memory_(use_monotonic_memory) {
ctx_.db_accessor = dba;
ctx_.symbol_table = plan->symbol_table();
ctx_.evaluation_context.timestamp = QueryTimestamp();
@ -1050,21 +1062,28 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
// single `Pull`.
static constexpr size_t stack_size = 256UL * 1024UL;
char stack_data[stack_size];
utils::ResourceWithOutOfMemoryException resource_with_exception;
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
// We can throw on every query because a simple queries for deleting will use only
// the stack allocated buffer.
// Also, we want to throw only when the query engine requests more memory and not the storage
// so we add the exception to the allocator.
// TODO (mferencevic): Tune the parameters accordingly.
utils::PoolResource pool_memory(128, 1024, &monotonic_memory, utils::NewDeleteResource());
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
utils::ResourceWithOutOfMemoryException resource_with_exception;
utils::MonotonicBufferResource monotonic_memory{&stack_data[0], stack_size, &resource_with_exception};
std::optional<utils::PoolResource> pool_memory;
if (!use_monotonic_memory_) {
pool_memory.emplace(8, kExecutionPoolMaxBlockSize, utils::NewDeleteResource(), utils::NewDeleteResource());
} else {
// We can throw on every query because a simple queries for deleting will use only
// the stack allocated buffer.
// Also, we want to throw only when the query engine requests more memory and not the storage
// so we add the exception to the allocator.
// TODO (mferencevic): Tune the parameters accordingly.
pool_memory.emplace(128, 1024, &monotonic_memory, utils::NewDeleteResource());
}
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
if (memory_limit_) {
maybe_limited_resource.emplace(&pool_memory, *memory_limit_);
maybe_limited_resource.emplace(&*pool_memory, *memory_limit_);
ctx_.evaluation_context.memory = &*maybe_limited_resource;
} else {
ctx_.evaluation_context.memory = &pool_memory;
ctx_.evaluation_context.memory = &*pool_memory;
}
// Returns true if a result was pulled.
@ -1224,16 +1243,19 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
if (memory_limit) {
spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit));
}
if (const auto &clauses = cypher_query->single_query_->clauses_; std::any_of(
clauses.begin(), clauses.end(), [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
auto clauses = cypher_query->single_query_->clauses_;
bool contains_csv = false;
if (std::any_of(clauses.begin(), clauses.end(),
[](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
notifications->emplace_back(
SeverityLevel::INFO, NotificationCode::LOAD_CSV_TIP,
"It's important to note that the parser parses the values as strings. It's up to the user to "
"convert the parsed row values to the appropriate type. This can be done using the built-in "
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
contains_csv = true;
}
// If this is LOAD CSV query, use PoolResource without MonotonicMemoryResource as we want to reuse allocated memory
auto use_monotonic_memory = !contains_csv;
auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query,
parsed_query.parameters,
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, dba);
@ -1256,7 +1278,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
}
auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
execution_memory, StringPointerToOptional(username), transaction_status,
trigger_context_collector, memory_limit);
trigger_context_collector, memory_limit, use_monotonic_memory);
return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
[pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
@ -2600,18 +2622,18 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
std::optional<std::string> user = StringPointerToOptional(username);
username_ = user;
query_executions_.emplace_back(std::make_unique<QueryExecution>());
auto &query_execution = query_executions_.back();
std::optional<int> qid =
in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
// Handle transaction control queries.
const auto upper_case_query = utils::ToUpperCase(query_string);
const auto trimmed_query = utils::Trim(upper_case_query);
if (trimmed_query == "BEGIN" || trimmed_query == "COMMIT" || trimmed_query == "ROLLBACK") {
query_executions_.emplace_back(
std::make_unique<QueryExecution>(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
auto &query_execution = query_executions_.back();
std::optional<int> qid =
in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
query_execution->prepared_query.emplace(PrepareTransactionQuery(trimmed_query));
return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid};
}
@ -2627,18 +2649,43 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
// If we're not in an explicit transaction block and we have an open
// transaction, abort it since we're about to prepare a new query.
else if (db_accessor_) {
AbortCommand(&query_execution);
query_executions_.emplace_back(
std::make_unique<QueryExecution>(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
AbortCommand(&query_executions_.back());
}
std::unique_ptr<QueryExecution> *query_execution_ptr = nullptr;
try {
// Set a default cost estimate of 0. Individual queries can overwrite this
// field with an improved estimate.
query_execution->summary["cost_estimate"] = 0.0;
query_executions_.emplace_back(
std::make_unique<QueryExecution>(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
query_execution_ptr = &query_executions_.back();
utils::Timer parsing_timer;
ParsedQuery parsed_query =
ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count();
TypedValue parsing_time{parsing_timer.Elapsed().count()};
if (utils::Downcast<CypherQuery>(parsed_query.query)) {
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
if (const auto &clauses = cypher_query->single_query_->clauses_;
std::any_of(clauses.begin(), clauses.end(),
[](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
// Using PoolResource without MonotonicMemoryResouce for LOAD CSV reduces memory usage.
// QueryExecution MemoryResource is mostly used for allocations done on Frame and storing `row`s
query_executions_[query_executions_.size() - 1] = std::make_unique<QueryExecution>(
utils::PoolResource(1, kExecutionPoolMaxBlockSize, utils::NewDeleteResource(), utils::NewDeleteResource()));
query_execution_ptr = &query_executions_.back();
}
}
auto &query_execution = query_executions_.back();
std::optional<int> qid =
in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
query_execution->summary["parsing_time"] = std::move(parsing_time);
// Set a default cost estimate of 0. Individual queries can overwrite this
// field with an improved estimate.
query_execution->summary["cost_estimate"] = 0.0;
// Some queries require an active transaction in order to be prepared.
if (!in_explicit_transaction_ &&
@ -2658,12 +2705,14 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
utils::Timer planning_timer;
PreparedQuery prepared_query;
utils::MemoryResource *memory_resource =
std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; },
query_execution->execution_memory);
if (utils::Downcast<CypherQuery>(parsed_query.query)) {
prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory,
&query_execution->notifications, username, &transaction_status_,
trigger_context_collector_ ? &*trigger_context_collector_ : nullptr);
prepared_query =
PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, memory_resource, &query_execution->notifications, username,
&transaction_status_, trigger_context_collector_ ? &*trigger_context_collector_ : nullptr);
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception);
@ -2673,7 +2722,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception, username, &transaction_status_);
} else if (utils::Downcast<DumpQuery>(parsed_query.query)) {
prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_,
&query_execution->execution_memory);
memory_resource);
} else if (utils::Downcast<IndexQuery>(parsed_query.query)) {
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
&query_execution->notifications, interpreter_context_);
@ -2745,7 +2794,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid};
} catch (const utils::BasicException &) {
EventCounter::IncrementCounter(EventCounter::FailedQuery);
AbortCommand(&query_execution);
AbortCommand(query_execution_ptr);
throw;
}
}

View File

@ -51,6 +51,7 @@ extern const Event FailedQuery;
namespace memgraph::query {
inline constexpr size_t kExecutionMemoryBlockSize = 1UL * 1024UL * 1024UL;
inline constexpr size_t kExecutionPoolMaxBlockSize = 32768UL; // 2 ^ 15
class AuthQueryHandler {
public:
@ -340,13 +341,29 @@ class Interpreter final {
private:
struct QueryExecution {
std::optional<PreparedQuery> prepared_query;
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
utils::ResourceWithOutOfMemoryException execution_memory_with_exception{&execution_memory};
std::variant<utils::MonotonicBufferResource, utils::PoolResource> execution_memory;
utils::ResourceWithOutOfMemoryException execution_memory_with_exception;
std::map<std::string, TypedValue> summary;
std::vector<Notification> notifications;
explicit QueryExecution() = default;
explicit QueryExecution(utils::MonotonicBufferResource monotonic_memory)
: execution_memory(std::move(monotonic_memory)) {
std::visit(
[&](auto &memory_resource) {
execution_memory_with_exception = utils::ResourceWithOutOfMemoryException(&memory_resource);
},
execution_memory);
};
explicit QueryExecution(utils::PoolResource pool_resource) : execution_memory(std::move(pool_resource)) {
std::visit(
[&](auto &memory_resource) {
execution_memory_with_exception = utils::ResourceWithOutOfMemoryException(&memory_resource);
},
execution_memory);
};
QueryExecution(const QueryExecution &) = delete;
QueryExecution(QueryExecution &&) = default;
QueryExecution &operator=(const QueryExecution &) = delete;
@ -357,7 +374,7 @@ class Interpreter final {
// destroy the prepared query which is using that instance
// of execution memory.
prepared_query.reset();
execution_memory.Release();
std::visit([](auto &memory_resource) { memory_resource.Release(); }, execution_memory);
}
};
@ -445,7 +462,9 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
try {
// Wrap the (statically polymorphic) stream type into a common type which
// the handler knows.
AnyStream stream{result_stream, &query_execution->execution_memory};
AnyStream stream{result_stream,
std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; },
query_execution->execution_memory)};
const auto maybe_res = query_execution->prepared_query->query_handler(&stream, n);
// Stream is using execution memory of the query_execution which
// can be deleted after its execution so the stream should be cleared