From a586f2f98db0455c07879459c5ea0eaa3e7f91fe Mon Sep 17 00:00:00 2001 From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com> Date: Tue, 4 Apr 2023 16:54:08 +0200 Subject: [PATCH] Change EvalContext and QueryExecution to use PoolResource on LOAD CSV (#825) * Change PullPlan to use specific PoolResource for LOAD CSV --- src/query/interpreter.cpp | 127 ++++++++++++++++++++++++++------------ src/query/interpreter.hpp | 29 +++++++-- 2 files changed, 112 insertions(+), 44 deletions(-) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 4135aa1a9..a94bef1c9 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -67,6 +68,7 @@ #include "utils/settings.hpp" #include "utils/string.hpp" #include "utils/tsc.hpp" +#include "utils/typeinfo.hpp" #include "utils/variant_helpers.hpp" namespace EventCounter { @@ -981,7 +983,8 @@ struct PullPlan { DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, std::optional username, std::atomic *transaction_status, TriggerContextCollector *trigger_context_collector = nullptr, - std::optional memory_limit = {}); + std::optional memory_limit = {}, bool use_monotonic_memory = true); + std::optional Pull(AnyStream *stream, std::optional n, const std::vector &output_symbols, std::map *summary); @@ -1005,16 +1008,25 @@ struct PullPlan { // we have to keep track of any unsent results from previous `PullPlan::Pull` // manually by using this flag. bool has_unsent_results_ = false; + + // In the case of LOAD CSV, we want to use only PoolResource without MonotonicMemoryResource + // to reuse allocated memory. As LOAD CSV is processing row by row + // it is possible to reduce memory usage significantly if MemoryResource deals with memory allocation + // can reuse memory that was allocated on processing the first row on all subsequent rows. + // This flag signals to `PullPlan::Pull` which MemoryResource to use + bool use_monotonic_memory_; }; PullPlan::PullPlan(const std::shared_ptr plan, const Parameters ¶meters, const bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, std::optional username, std::atomic *transaction_status, - TriggerContextCollector *trigger_context_collector, const std::optional memory_limit) + TriggerContextCollector *trigger_context_collector, const std::optional memory_limit, + bool use_monotonic_memory) : plan_(plan), cursor_(plan->plan().MakeCursor(execution_memory)), frame_(plan->symbol_table().max_position(), execution_memory), - memory_limit_(memory_limit) { + memory_limit_(memory_limit), + use_monotonic_memory_(use_monotonic_memory) { ctx_.db_accessor = dba; ctx_.symbol_table = plan->symbol_table(); ctx_.evaluation_context.timestamp = QueryTimestamp(); @@ -1050,21 +1062,28 @@ std::optional PullPlan::Pull(AnyStream *strea // single `Pull`. static constexpr size_t stack_size = 256UL * 1024UL; char stack_data[stack_size]; - utils::ResourceWithOutOfMemoryException resource_with_exception; - utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception); - // We can throw on every query because a simple queries for deleting will use only - // the stack allocated buffer. - // Also, we want to throw only when the query engine requests more memory and not the storage - // so we add the exception to the allocator. - // TODO (mferencevic): Tune the parameters accordingly. - utils::PoolResource pool_memory(128, 1024, &monotonic_memory, utils::NewDeleteResource()); - std::optional maybe_limited_resource; + utils::ResourceWithOutOfMemoryException resource_with_exception; + utils::MonotonicBufferResource monotonic_memory{&stack_data[0], stack_size, &resource_with_exception}; + std::optional pool_memory; + + if (!use_monotonic_memory_) { + pool_memory.emplace(8, kExecutionPoolMaxBlockSize, utils::NewDeleteResource(), utils::NewDeleteResource()); + } else { + // We can throw on every query because a simple queries for deleting will use only + // the stack allocated buffer. + // Also, we want to throw only when the query engine requests more memory and not the storage + // so we add the exception to the allocator. + // TODO (mferencevic): Tune the parameters accordingly. + pool_memory.emplace(128, 1024, &monotonic_memory, utils::NewDeleteResource()); + } + + std::optional maybe_limited_resource; if (memory_limit_) { - maybe_limited_resource.emplace(&pool_memory, *memory_limit_); + maybe_limited_resource.emplace(&*pool_memory, *memory_limit_); ctx_.evaluation_context.memory = &*maybe_limited_resource; } else { - ctx_.evaluation_context.memory = &pool_memory; + ctx_.evaluation_context.memory = &*pool_memory; } // Returns true if a result was pulled. @@ -1224,16 +1243,19 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::mapsingle_query_->clauses_; std::any_of( - clauses.begin(), clauses.end(), [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) { + auto clauses = cypher_query->single_query_->clauses_; + bool contains_csv = false; + if (std::any_of(clauses.begin(), clauses.end(), + [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) { notifications->emplace_back( SeverityLevel::INFO, NotificationCode::LOAD_CSV_TIP, "It's important to note that the parser parses the values as strings. It's up to the user to " "convert the parsed row values to the appropriate type. This can be done using the built-in " "conversion functions such as ToInteger, ToFloat, ToBoolean etc."); + contains_csv = true; } - + // If this is LOAD CSV query, use PoolResource without MonotonicMemoryResource as we want to reuse allocated memory + auto use_monotonic_memory = !contains_csv; auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters, parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, dba); @@ -1256,7 +1278,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map(plan, parsed_query.parameters, false, dba, interpreter_context, execution_memory, StringPointerToOptional(username), transaction_status, - trigger_context_collector, memory_limit); + trigger_context_collector, memory_limit, use_monotonic_memory); return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges), [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary]( AnyStream *stream, std::optional n) -> std::optional { @@ -2600,18 +2622,18 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, std::optional user = StringPointerToOptional(username); username_ = user; - query_executions_.emplace_back(std::make_unique()); - auto &query_execution = query_executions_.back(); - - std::optional qid = - in_explicit_transaction_ ? static_cast(query_executions_.size() - 1) : std::optional{}; - // Handle transaction control queries. const auto upper_case_query = utils::ToUpperCase(query_string); const auto trimmed_query = utils::Trim(upper_case_query); if (trimmed_query == "BEGIN" || trimmed_query == "COMMIT" || trimmed_query == "ROLLBACK") { + query_executions_.emplace_back( + std::make_unique(utils::MonotonicBufferResource(kExecutionMemoryBlockSize))); + auto &query_execution = query_executions_.back(); + std::optional qid = + in_explicit_transaction_ ? static_cast(query_executions_.size() - 1) : std::optional{}; + query_execution->prepared_query.emplace(PrepareTransactionQuery(trimmed_query)); return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid}; } @@ -2627,18 +2649,43 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, // If we're not in an explicit transaction block and we have an open // transaction, abort it since we're about to prepare a new query. else if (db_accessor_) { - AbortCommand(&query_execution); + query_executions_.emplace_back( + std::make_unique(utils::MonotonicBufferResource(kExecutionMemoryBlockSize))); + AbortCommand(&query_executions_.back()); } - + std::unique_ptr *query_execution_ptr = nullptr; try { - // Set a default cost estimate of 0. Individual queries can overwrite this - // field with an improved estimate. - query_execution->summary["cost_estimate"] = 0.0; - + query_executions_.emplace_back( + std::make_unique(utils::MonotonicBufferResource(kExecutionMemoryBlockSize))); + query_execution_ptr = &query_executions_.back(); utils::Timer parsing_timer; ParsedQuery parsed_query = ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query); - query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count(); + TypedValue parsing_time{parsing_timer.Elapsed().count()}; + + if (utils::Downcast(parsed_query.query)) { + auto *cypher_query = utils::Downcast(parsed_query.query); + if (const auto &clauses = cypher_query->single_query_->clauses_; + std::any_of(clauses.begin(), clauses.end(), + [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) { + // Using PoolResource without MonotonicMemoryResouce for LOAD CSV reduces memory usage. + // QueryExecution MemoryResource is mostly used for allocations done on Frame and storing `row`s + query_executions_[query_executions_.size() - 1] = std::make_unique( + utils::PoolResource(1, kExecutionPoolMaxBlockSize, utils::NewDeleteResource(), utils::NewDeleteResource())); + query_execution_ptr = &query_executions_.back(); + } + } + + auto &query_execution = query_executions_.back(); + + std::optional qid = + in_explicit_transaction_ ? static_cast(query_executions_.size() - 1) : std::optional{}; + + query_execution->summary["parsing_time"] = std::move(parsing_time); + + // Set a default cost estimate of 0. Individual queries can overwrite this + // field with an improved estimate. + query_execution->summary["cost_estimate"] = 0.0; // Some queries require an active transaction in order to be prepared. if (!in_explicit_transaction_ && @@ -2658,12 +2705,14 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, utils::Timer planning_timer; PreparedQuery prepared_query; - + utils::MemoryResource *memory_resource = + std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; }, + query_execution->execution_memory); if (utils::Downcast(parsed_query.query)) { - prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, - &*execution_db_accessor_, &query_execution->execution_memory, - &query_execution->notifications, username, &transaction_status_, - trigger_context_collector_ ? &*trigger_context_collector_ : nullptr); + prepared_query = + PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, + &*execution_db_accessor_, memory_resource, &query_execution->notifications, username, + &transaction_status_, trigger_context_collector_ ? &*trigger_context_collector_ : nullptr); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, &*execution_db_accessor_, &query_execution->execution_memory_with_exception); @@ -2673,7 +2722,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, &*execution_db_accessor_, &query_execution->execution_memory_with_exception, username, &transaction_status_); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_, - &query_execution->execution_memory); + memory_resource); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, interpreter_context_); @@ -2745,7 +2794,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid}; } catch (const utils::BasicException &) { EventCounter::IncrementCounter(EventCounter::FailedQuery); - AbortCommand(&query_execution); + AbortCommand(query_execution_ptr); throw; } } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index fc9842df3..53d8b7e60 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -51,6 +51,7 @@ extern const Event FailedQuery; namespace memgraph::query { inline constexpr size_t kExecutionMemoryBlockSize = 1UL * 1024UL * 1024UL; +inline constexpr size_t kExecutionPoolMaxBlockSize = 32768UL; // 2 ^ 15 class AuthQueryHandler { public: @@ -340,13 +341,29 @@ class Interpreter final { private: struct QueryExecution { std::optional prepared_query; - utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; - utils::ResourceWithOutOfMemoryException execution_memory_with_exception{&execution_memory}; + std::variant execution_memory; + utils::ResourceWithOutOfMemoryException execution_memory_with_exception; std::map summary; std::vector notifications; - explicit QueryExecution() = default; + explicit QueryExecution(utils::MonotonicBufferResource monotonic_memory) + : execution_memory(std::move(monotonic_memory)) { + std::visit( + [&](auto &memory_resource) { + execution_memory_with_exception = utils::ResourceWithOutOfMemoryException(&memory_resource); + }, + execution_memory); + }; + + explicit QueryExecution(utils::PoolResource pool_resource) : execution_memory(std::move(pool_resource)) { + std::visit( + [&](auto &memory_resource) { + execution_memory_with_exception = utils::ResourceWithOutOfMemoryException(&memory_resource); + }, + execution_memory); + }; + QueryExecution(const QueryExecution &) = delete; QueryExecution(QueryExecution &&) = default; QueryExecution &operator=(const QueryExecution &) = delete; @@ -357,7 +374,7 @@ class Interpreter final { // destroy the prepared query which is using that instance // of execution memory. prepared_query.reset(); - execution_memory.Release(); + std::visit([](auto &memory_resource) { memory_resource.Release(); }, execution_memory); } }; @@ -445,7 +462,9 @@ std::map Interpreter::Pull(TStream *result_stream, std: try { // Wrap the (statically polymorphic) stream type into a common type which // the handler knows. - AnyStream stream{result_stream, &query_execution->execution_memory}; + AnyStream stream{result_stream, + std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; }, + query_execution->execution_memory)}; const auto maybe_res = query_execution->prepared_query->query_handler(&stream, n); // Stream is using execution memory of the query_execution which // can be deleted after its execution so the stream should be cleared