From 8de31092ad3607d700bdfe0c11322cddd1d8709b Mon Sep 17 00:00:00 2001
From: antonio2368 <antonio2368@users.noreply.github.com>
Date: Tue, 13 Apr 2021 10:41:50 +0200
Subject: [PATCH] Add back the query memory limit logic (#134)

---
 src/query/interpreter.cpp | 76 +++++++++++++++++++++++++++------------
 1 file changed, 53 insertions(+), 23 deletions(-)

diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp
index 54cafbd53..4bd606cdc 100644
--- a/src/query/interpreter.cpp
+++ b/src/query/interpreter.cpp
@@ -27,6 +27,7 @@
 #include "utils/logging.hpp"
 #include "utils/memory.hpp"
 #include "utils/memory_tracker.hpp"
+#include "utils/readable_size.hpp"
 #include "utils/string.hpp"
 #include "utils/tsc.hpp"
 
@@ -604,7 +605,7 @@ struct PullPlanVector {
 struct PullPlan {
   explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters &parameters, bool is_profile_query,
                     DbAccessor *dba, InterpreterContext *interpreter_context,
-                    utils::MonotonicBufferResource *execution_memory);
+                    utils::MonotonicBufferResource *execution_memory, std::optional<size_t> memory_limit = {});
   std::optional<ExecutionContext> Pull(AnyStream *stream, std::optional<int> n,
                                        const std::vector<Symbol> &output_symbols,
                                        std::map<std::string, TypedValue> *summary);
@@ -614,6 +615,7 @@ struct PullPlan {
   plan::UniqueCursorPtr cursor_ = nullptr;
   Frame frame_;
   ExecutionContext ctx_;
+  std::optional<size_t> memory_limit_;
 
   // As it's possible to query execution using multiple pulls
   // we need the keep track of the total execution time across
@@ -631,10 +633,11 @@ struct PullPlan {
 
 PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
                    DbAccessor *dba, InterpreterContext *interpreter_context,
-                   utils::MonotonicBufferResource *execution_memory)
+                   utils::MonotonicBufferResource *execution_memory, const std::optional<size_t> memory_limit)
     : plan_(plan),
       cursor_(plan->plan().MakeCursor(execution_memory)),
-      frame_(plan->symbol_table().max_position(), execution_memory) {
+      frame_(plan->symbol_table().max_position(), execution_memory),
+      memory_limit_(memory_limit) {
   ctx_.db_accessor = dba;
   ctx_.symbol_table = plan->symbol_table();
   ctx_.evaluation_context.timestamp =
@@ -657,21 +660,25 @@ std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<
   // single `Pull`.
   constexpr size_t stack_size = 256 * 1024;
   char stack_data[stack_size];
+  utils::ResourceWithOutOfMemoryException resource_with_exception;
+  utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
+  // We can throw on every query because a simple queries for deleting will use only
+  // the stack allocated buffer.
+  // Also, we want to throw only when the query engine requests more memory and not the storage
+  // so we add the exception to the allocator.
+  // TODO (mferencevic): Tune the parameters accordingly.
+  utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
+  std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
+
+  if (memory_limit_) {
+    maybe_limited_resource.emplace(&pool_memory, *memory_limit_);
+    ctx_.evaluation_context.memory = &*maybe_limited_resource;
+  } else {
+    ctx_.evaluation_context.memory = &pool_memory;
+  }
 
   // Returns true if a result was pulled.
-  const auto pull_result = [&]() -> bool {
-    // We can throw on every query because a simple queries for deleting will use only
-    // the stack allocated buffer.
-    // Also, we want to throw only when the query engine requests more memory and not the storage
-    // so we add the exception to the allocator.
-    utils::ResourceWithOutOfMemoryException resource_with_exception;
-    utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
-    // TODO (mferencevic): Tune the parameters accordingly.
-    utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
-    ctx_.evaluation_context.memory = &pool_memory;
-
-    return cursor_->Pull(frame_, ctx_);
-  };
+  const auto pull_result = [&]() -> bool { return cursor_->Pull(frame_, ctx_); };
 
   const auto stream_values = [&]() {
     // TODO: The streamed values should also probably use the above memory.
@@ -829,9 +836,23 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
 PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
                                  InterpreterContext *interpreter_context, DbAccessor *dba,
                                  utils::MonotonicBufferResource *execution_memory) {
-  auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage),
-                                utils::Downcast<CypherQuery>(parsed_query.query), parsed_query.parameters,
-                                &interpreter_context->plan_cache, dba, parsed_query.is_cacheable);
+  auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
+
+  Frame frame(0);
+  SymbolTable symbol_table;
+  EvaluationContext evaluation_context;
+  evaluation_context.timestamp =
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
+          .count();
+  evaluation_context.parameters = parsed_query.parameters;
+  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::View::OLD);
+  const auto memory_limit = EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
+  if (memory_limit) {
+    spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit));
+  }
+
+  auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query,
+                                parsed_query.parameters, &interpreter_context->plan_cache, dba);
 
   summary->insert_or_assign("cost_estimate", plan->cost());
   auto rw_type_checker = plan::ReadWriteTypeChecker();
@@ -850,8 +871,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
         utils::FindOr(parsed_query.stripped_query.named_expressions(), symbol.token_position(), symbol.name()).first);
   }
 
-  auto pull_plan =
-      std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context, execution_memory);
+  auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
+                                              execution_memory, memory_limit);
   return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
                        [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
                            AnyStream *stream, std::optional<int> n) -> std::optional<QueryHandlerResult> {
@@ -949,6 +970,15 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
 
   auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
   MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in PROFILE");
+  Frame frame(0);
+  SymbolTable symbol_table;
+  EvaluationContext evaluation_context;
+  evaluation_context.timestamp =
+      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
+          .count();
+  evaluation_context.parameters = parsed_inner_query.parameters;
+  ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::View::OLD);
+  const auto memory_limit = EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
 
   auto cypher_query_plan = CypherQueryToPlan(
       parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query,
@@ -960,14 +990,14 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
       {"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"},
       std::move(parsed_query.required_privileges),
       [plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), summary, dba,
-       interpreter_context, execution_memory,
+       interpreter_context, execution_memory, memory_limit,
        // We want to execute the query we are profiling lazily, so we delay
        // the construction of the corresponding context.
        ctx = std::optional<ExecutionContext>{}, pull_plan = std::shared_ptr<PullPlanVector>(nullptr)](
           AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
         // No output symbols are given so that nothing is streamed.
         if (!ctx) {
-          ctx = PullPlan(plan, parameters, true, dba, interpreter_context, execution_memory)
+          ctx = PullPlan(plan, parameters, true, dba, interpreter_context, execution_memory, memory_limit)
                     .Pull(stream, {}, {}, summary);
           pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(ctx->stats, ctx->profile_execution_time));
         }