diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index df3c525a8..70f03e2f6 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -187,17 +187,6 @@ class SingleNodeLogicalPlan final : public LogicalPlan { CachedPlan::CachedPlan(std::unique_ptr plan) : plan_(std::move(plan)) {} -void Interpreter::PrettyPrintPlan(const DbAccessor &dba, - const plan::LogicalOperator *plan_root, - std::ostream *out) { - plan::PrettyPrint(dba, plan_root, out); -} - -std::string Interpreter::PlanToJson(const DbAccessor &dba, - const plan::LogicalOperator *plan_root) { - return plan::PlanToJson(dba, plan_root).dump(); -} - struct Callback { std::vector header; std::function>()> fn; @@ -768,8 +757,7 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) std::pair, std::vector> Interpreter::Interpret(const std::string &query, const std::map ¶ms) { - // Clear pending results. - results_ = std::nullopt; + prepared_query_ = std::nullopt; execution_memory_.Release(); // Check the query for transaction commands. @@ -820,329 +808,596 @@ Interpreter::Interpret(const std::string &query, execution_db_accessor_.emplace(&*db_accessor_); } - // Clear leftover results. - results_ = std::nullopt; + prepared_query_ = std::nullopt; execution_memory_.Release(); - // Interpret the query and return the headers. + // Prepare the query and return the headers. try { - results_.emplace(Prepare(query, params, &*execution_db_accessor_)); - return {results_->header(), results_->privileges()}; + Prepare(query, params); + return {prepared_query_->header, prepared_query_->privileges}; } catch (const utils::BasicException &) { AbortCommand(); throw; } } -Interpreter::Results Interpreter::Prepare( - const std::string &query_string, - const std::map ¶ms, - DbAccessor *db_accessor) { - std::map summary; +ExecutionContext PullAllPlan(AnyStream *stream, const CachedPlan &plan, + const Parameters ¶meters, + const std::vector &output_symbols, + bool is_profile_query, + std::map *summary, + DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { + auto cursor = plan.plan().MakeCursor(execution_memory); + Frame frame(plan.symbol_table().max_position(), execution_memory); - utils::Timer parsing_timer; - ParsedQuery parsed_query = - ParseQuery(query_string, params, &interpreter_context_->ast_cache, - &interpreter_context_->antlr_lock); - auto parsing_time = parsing_timer.Elapsed(); - summary["parsing_time"] = parsing_time.count(); - // TODO: set summary['type'] based on transaction metadata - // the type can't be determined based only on top level LogicalOp - // (for example MATCH DELETE RETURN will have Produce as it's top). - // For now always use "rw" because something must be set, but it doesn't - // have to be correct (for Bolt clients). - summary["type"] = "rw"; + // Set up temporary memory for a single Pull. Initial memory comes from the + // stack. 256 KiB should fit on the stack and should be more than enough for a + // single `Pull`. + constexpr size_t stack_size = 256 * 1024; + char stack_data[stack_size]; - utils::Timer planning_timer; + ExecutionContext ctx; + ctx.db_accessor = dba; + ctx.symbol_table = plan.symbol_table(); + ctx.evaluation_context.timestamp = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + ctx.evaluation_context.parameters = parameters; + ctx.evaluation_context.properties = + NamesToProperties(plan.ast_storage().properties_, dba); + ctx.evaluation_context.labels = + NamesToLabels(plan.ast_storage().labels_, dba); + ctx.is_profile_query = is_profile_query; - // This local shared_ptr might be the only owner of the CachedPlan, so - // we must ensure it lives during the whole interpretation. - std::shared_ptr plan{nullptr}; + utils::Timer timer; -#ifdef MG_SINGLE_NODE_HA - { - InfoQuery *info_query = nullptr; - if (!db_accessor->raft()->IsLeader() && - (!(info_query = utils::Downcast(parsed_query.query)) || - info_query->info_type_ != InfoQuery::InfoType::RAFT)) { - throw raft::CantExecuteQueries(); - } - } -#endif + while (true) { + utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size); + // TODO (mferencevic): Tune the parameters accordingly. + utils::PoolResource pool_memory(128, 1024, &monotonic_memory); + ctx.evaluation_context.memory = &pool_memory; - if (auto *cypher_query = utils::Downcast(parsed_query.query)) { - plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), cypher_query, - std::move(parsed_query.ast_storage), - parsed_query.parameters, db_accessor); - auto planning_time = planning_timer.Elapsed(); - summary["planning_time"] = planning_time.count(); - summary["cost_estimate"] = plan->cost(); - - auto output_symbols = plan->plan().OutputSymbols(plan->symbol_table()); - - std::vector header; - header.reserve(output_symbols.size()); - - for (const auto &symbol : output_symbols) { - // When the symbol is aliased or expanded from '*' (inside RETURN or - // WITH), then there is no token position, so use symbol name. - // Otherwise, find the name from stripped query. - header.push_back( - utils::FindOr(parsed_query.stripped_query.named_expressions(), - symbol.token_position(), symbol.name()) - .first); + if (!cursor->Pull(frame, ctx)) { + break; } - return Results(db_accessor, parsed_query.parameters, plan, - std::move(output_symbols), std::move(header), - std::move(summary), parsed_query.required_privileges, - &execution_memory_); - } + if (!output_symbols.empty()) { + // TODO: The streamed values should also probably use the above memory. + std::vector values; + values.reserve(output_symbols.size()); - if (utils::IsSubtype(*parsed_query.query, ExplainQuery::kType)) { - const std::string kExplainQueryStart = "explain "; - CHECK(utils::StartsWith( - utils::ToLowerCase(parsed_query.stripped_query.query()), - kExplainQueryStart)) - << "Expected stripped query to start with '" << kExplainQueryStart - << "'"; - - // We want to cache the Cypher query that appears within this "metaquery". - // However, we can't just use the hash of that Cypher query string (as the - // cache key) but then continue to use the AST that was constructed with the - // full string. The parameters within the AST are looked up using their - // token positions, which depend on the query string as they're computed at - // the time the query string is parsed. So, for example, if one first runs - // EXPLAIN (or PROFILE) on a Cypher query and *then* runs the same Cypher - // query standalone, the second execution will crash because the cached AST - // (constructed using the first query string but cached using the substring - // (equivalent to the second query string)) will use the old token - // positions. For that reason, we fully strip and parse the substring as - // well. - // - // Note that the stripped subquery string's hash will be equivalent to the - // hash of the stripped query as if it was run standalone. This guarantees - // that we will reuse any cached plans from before, rather than create a new - // one every time. This is important because the planner takes the values of - // the query parameters into account when planning and might produce a - // totally different plan if we were to create a new one right now. Doing so - // would result in discrepancies between the explained (or profiled) plan - // and the one that's executed when the query is ran standalone. - ParsedQuery parsed_query = ParseQuery( - query_string.substr(kExplainQueryStart.size()), params, - &interpreter_context_->ast_cache, &interpreter_context_->antlr_lock); - auto *cypher_query = utils::Downcast(parsed_query.query); - CHECK(cypher_query) - << "Cypher grammar should not allow other queries in EXPLAIN"; - std::shared_ptr cypher_query_plan = - CypherQueryToPlan(parsed_query.stripped_query.hash(), cypher_query, - std::move(parsed_query.ast_storage), - parsed_query.parameters, db_accessor); - - std::stringstream printed_plan; - PrettyPrintPlan(*db_accessor, &cypher_query_plan->plan(), &printed_plan); - - std::vector> printed_plan_rows; - for (const auto &row : - utils::Split(utils::RTrim(printed_plan.str()), "\n")) { - printed_plan_rows.push_back(std::vector{TypedValue(row)}); - } - - summary["explain"] = PlanToJson(*db_accessor, &cypher_query_plan->plan()); - - SymbolTable symbol_table; - auto query_plan_symbol = symbol_table.CreateSymbol("QUERY PLAN", false); - std::vector output_symbols{query_plan_symbol}; - - auto output_plan = - std::make_unique(output_symbols, printed_plan_rows); - - plan = std::make_shared(std::make_unique( - std::move(output_plan), 0.0, AstStorage{}, symbol_table)); - - auto planning_time = planning_timer.Elapsed(); - summary["planning_time"] = planning_time.count(); - - std::vector header{query_plan_symbol.name()}; - - return Results(db_accessor, parsed_query.parameters, plan, - std::move(output_symbols), std::move(header), - std::move(summary), parsed_query.required_privileges, - &execution_memory_); - } - - if (utils::IsSubtype(*parsed_query.query, ProfileQuery::kType)) { - const std::string kProfileQueryStart = "profile "; - CHECK(utils::StartsWith( - utils::ToLowerCase(parsed_query.stripped_query.query()), - kProfileQueryStart)) - << "Expected stripped query to start with '" << kProfileQueryStart - << "'"; - - if (in_explicit_transaction_) { - throw ProfileInMulticommandTxException(); - } - - if (!interpreter_context_->is_tsc_available) { - throw QueryException("TSC support is missing for PROFILE"); - } - - // See the comment regarding the caching of Cypher queries within - // "metaqueries" for explain queries - ParsedQuery parsed_query = ParseQuery( - query_string.substr(kProfileQueryStart.size()), params, - &interpreter_context_->ast_cache, &interpreter_context_->antlr_lock); - auto *cypher_query = utils::Downcast(parsed_query.query); - CHECK(cypher_query) - << "Cypher grammar should not allow other queries in PROFILE"; - auto cypher_query_plan = - CypherQueryToPlan(parsed_query.stripped_query.hash(), cypher_query, - std::move(parsed_query.ast_storage), - parsed_query.parameters, db_accessor); - - // Copy the symbol table and add our own symbols (used by the `OutputTable` - // operator below) - SymbolTable symbol_table(cypher_query_plan->symbol_table()); - - auto operator_symbol = symbol_table.CreateSymbol("OPERATOR", false); - auto actual_hits_symbol = symbol_table.CreateSymbol("ACTUAL HITS", false); - auto relative_time_symbol = - symbol_table.CreateSymbol("RELATIVE TIME", false); - auto absolute_time_symbol = - symbol_table.CreateSymbol("ABSOLUTE TIME", false); - - std::vector output_symbols = {operator_symbol, actual_hits_symbol, - relative_time_symbol, - absolute_time_symbol}; - std::vector header{ - operator_symbol.name(), actual_hits_symbol.name(), - relative_time_symbol.name(), absolute_time_symbol.name()}; - - auto output_plan = std::make_unique( - output_symbols, - [cypher_query_plan](Frame *frame, ExecutionContext *context) { - utils::MonotonicBufferResource execution_memory(1 * 1024 * 1024); - auto cursor = cypher_query_plan->plan().MakeCursor(&execution_memory); - - // We are pulling from another plan, so set up the EvaluationContext - // correctly. The rest of the context should be good for sharing. - context->evaluation_context.properties = - NamesToProperties(cypher_query_plan->ast_storage().properties_, - context->db_accessor); - context->evaluation_context.labels = NamesToLabels( - cypher_query_plan->ast_storage().labels_, context->db_accessor); - - // Pull everything to profile the execution - utils::Timer timer; - while (cursor->Pull(*frame, *context)) continue; - auto execution_time = timer.Elapsed(); - - context->profile_execution_time = execution_time; - - return ProfilingStatsToTable(context->stats, execution_time); - }); - - plan = std::make_shared(std::make_unique( - std::move(output_plan), 0.0, AstStorage{}, symbol_table)); - - auto planning_time = planning_timer.Elapsed(); - summary["planning_time"] = planning_time.count(); - - return Results(db_accessor, parsed_query.parameters, plan, - std::move(output_symbols), std::move(header), - std::move(summary), parsed_query.required_privileges, - &execution_memory_, - /* is_profile_query */ true, /* should_abort_query */ true); - } - - if (auto *dump_query = utils::Downcast(parsed_query.query)) { -#ifndef MG_SINGLE_NODE_HA - SymbolTable symbol_table; - auto query_symbol = symbol_table.CreateSymbol("QUERY", false); - - std::vector output_symbols = {query_symbol}; - std::vector header = {query_symbol.name()}; - - auto output_plan = std::make_unique( - output_symbols, DumpClosure(db_accessor)); - plan = std::make_shared(std::make_unique( - std::move(output_plan), 0.0, AstStorage{}, symbol_table)); - - summary["planning_time"] = planning_timer.Elapsed().count(); - - return Results(db_accessor, parsed_query.parameters, plan, - std::move(output_symbols), std::move(header), - std::move(summary), parsed_query.required_privileges, - &execution_memory_, - /* is_profile_query */ false, - /* should_abort_query */ false); -#else - throw utils::NotYetImplemented("Dump database"); -#endif - } - - Callback callback; - if (auto *index_query = utils::Downcast(parsed_query.query)) { - if (in_explicit_transaction_) { - throw IndexInMulticommandTxException(); - } - // Creating an index influences computed plan costs. - auto invalidate_plan_cache = [plan_cache = - &this->interpreter_context_->plan_cache] { - auto access = plan_cache->access(); - for (auto &kv : access) { - access.remove(kv.first); + for (const auto &symbol : output_symbols) { + values.emplace_back(frame[symbol]); } - }; - callback = - HandleIndexQuery(index_query, invalidate_plan_cache, db_accessor); - } else if (auto *auth_query = - utils::Downcast(parsed_query.query)) { -#ifdef MG_SINGLE_NODE_HA - throw utils::NotYetImplemented( - "Managing user privileges is not yet supported in Memgraph HA " - "instance."); -#else - if (in_explicit_transaction_) { - throw UserModificationInMulticommandTxException(); + + stream->Result(values); } - callback = HandleAuthQuery(auth_query, interpreter_context_->auth, - parsed_query.parameters, db_accessor); -#endif - } else if (auto *info_query = - utils::Downcast(parsed_query.query)) { - callback = HandleInfoQuery(info_query, db_accessor); - } else if (auto *constraint_query = - utils::Downcast(parsed_query.query)) { - callback = HandleConstraintQuery(constraint_query, db_accessor); - } else { - LOG(FATAL) << "Should not get here -- unknown query type!"; } + summary->insert_or_assign("plan_execution_time", timer.Elapsed().count()); + cursor->Shutdown(); + + return ctx; +} + +/** + * Convert a parsed *Cypher* query's AST into a logical plan. + * + * The created logical plan will take ownership of the `AstStorage` within + * `ParsedQuery` and might modify it during planning. + */ +std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, + CypherQuery *query, + const Parameters ¶meters, + DbAccessor *db_accessor) { + auto vertex_counts = plan::MakeVertexCountCache(db_accessor); + auto symbol_table = MakeSymbolTable(query); + auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, + query, &vertex_counts); + std::unique_ptr root; + double cost; + std::tie(root, cost) = plan::MakeLogicalPlan(&planning_context, parameters, + FLAGS_query_cost_planner); + return std::make_unique( + std::move(root), cost, std::move(ast_storage), std::move(symbol_table)); +} + +/** + * Return the parsed *Cypher* query's AST cached logical plan, or create and + * cache a fresh one if it doesn't yet exist. + */ +std::shared_ptr CypherQueryToPlan( + HashType hash, AstStorage ast_storage, CypherQuery *query, + const Parameters ¶meters, utils::SkipList *plan_cache, + DbAccessor *db_accessor) { + auto plan_cache_access = plan_cache->access(); + auto it = plan_cache_access.find(hash); + if (it != plan_cache_access.end()) { + if (it->second->IsExpired()) { + plan_cache_access.remove(hash); + } else { + return it->second; + } + } + return plan_cache_access + .insert({hash, + std::make_shared(MakeLogicalPlan( + std::move(ast_storage), (query), parameters, db_accessor))}) + .first->second; +} + +PreparedQuery PrepareCypherQuery( + ParsedQuery parsed_query, std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { + auto plan = CypherQueryToPlan( + parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), + utils::Downcast(parsed_query.query), parsed_query.parameters, + &interpreter_context->plan_cache, dba); + + summary->insert_or_assign("cost_estimate", plan->cost()); + + auto output_symbols = plan->plan().OutputSymbols(plan->symbol_table()); + + std::vector header; + header.reserve(output_symbols.size()); + + for (const auto &symbol : output_symbols) { + // When the symbol is aliased or expanded from '*' (inside RETURN or + // WITH), then there is no token position, so use symbol name. + // Otherwise, find the name from stripped query. + header.push_back( + utils::FindOr(parsed_query.stripped_query.named_expressions(), + symbol.token_position(), symbol.name()) + .first); + } + + return PreparedQuery{ + std::move(header), std::move(parsed_query.required_privileges), + [plan = std::move(plan), parameters = std::move(parsed_query.parameters), + output_symbols = std::move(output_symbols), summary, dba, + execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, false, summary, + dba, execution_memory); + return true; + }}; +} + +PreparedQuery PrepareExplainQuery( + ParsedQuery parsed_query, std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { + const std::string kExplainQueryStart = "explain "; + + CHECK( + utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), + kExplainQueryStart)) + << "Expected stripped query to start with '" << kExplainQueryStart << "'"; + + // Parse and cache the inner query separately (as if it was a standalone + // query), producing a fresh AST. Note that currently we cannot just reuse + // part of the already produced AST because the parameters within ASTs are + // looked up using their positions within the string that was parsed. These + // wouldn't match up if if we were to reuse the AST (produced by parsing the + // full query string) when given just the inner query to execute. + ParsedQuery parsed_inner_query = + ParseQuery(parsed_query.query_string.substr(kExplainQueryStart.size()), + parsed_query.user_parameters, &interpreter_context->ast_cache, + &interpreter_context->antlr_lock); + + auto *cypher_query = utils::Downcast(parsed_inner_query.query); + CHECK(cypher_query) + << "Cypher grammar should not allow other queries in EXPLAIN"; + + auto cypher_query_plan = CypherQueryToPlan( + parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), + utils::Downcast(parsed_query.query), parsed_query.parameters, + &interpreter_context->plan_cache, dba); + + std::stringstream printed_plan; + plan::PrettyPrint(*dba, &cypher_query_plan->plan(), &printed_plan); + + std::vector> printed_plan_rows; + for (const auto &row : utils::Split(utils::RTrim(printed_plan.str()), "\n")) { + printed_plan_rows.push_back(std::vector{TypedValue(row)}); + } + + summary->insert_or_assign( + "explain", plan::PlanToJson(*dba, &cypher_query_plan->plan()).dump()); + + SymbolTable symbol_table; + auto query_plan_symbol = symbol_table.CreateSymbol("QUERY PLAN", false); + std::vector output_symbols{query_plan_symbol}; + + auto output_plan = + std::make_unique(output_symbols, printed_plan_rows); + + auto plan = + std::make_shared(std::make_unique( + std::move(output_plan), 0.0, AstStorage{}, symbol_table)); + + std::vector header{query_plan_symbol.name()}; + + return PreparedQuery{std::move(header), + std::move(parsed_query.required_privileges), + [plan = std::move(plan), + parameters = std::move(parsed_inner_query.parameters), + output_symbols = std::move(output_symbols), summary, + dba, execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, + false, summary, dba, execution_memory); + return true; + }}; +} + +PreparedQuery PrepareProfileQuery( + ParsedQuery parsed_query, bool in_explicit_transaction, + std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { + const std::string kProfileQueryStart = "profile "; + + CHECK( + utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), + kProfileQueryStart)) + << "Expected stripped query to start with '" << kProfileQueryStart << "'"; + + if (in_explicit_transaction) { + throw ProfileInMulticommandTxException(); + } + + if (interpreter_context->is_tsc_available) { + throw QueryException("TSC support is missing for PROFILE"); + } + + // Parse and cache the inner query separately (as if it was a standalone + // query), producing a fresh AST. Note that currently we cannot just reuse + // part of the already produced AST because the parameters within ASTs are + // looked up using their positions within the string that was parsed. These + // wouldn't match up if if we were to reuse the AST (produced by parsing the + // full query string) when given just the inner query to execute. + ParsedQuery parsed_inner_query = + ParseQuery(parsed_query.query_string.substr(kProfileQueryStart.size()), + parsed_query.user_parameters, &interpreter_context->ast_cache, + &interpreter_context->antlr_lock); + + auto *cypher_query = utils::Downcast(parsed_inner_query.query); + CHECK(cypher_query) + << "Cypher grammar should not allow other queries in PROFILE"; + + auto cypher_query_plan = CypherQueryToPlan( + parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), + utils::Downcast(parsed_query.query), parsed_query.parameters, + &interpreter_context->plan_cache, dba); + + // Copy the symbol table and add our own symbols (used by the `OutputTable` + // operator below) + SymbolTable symbol_table(cypher_query_plan->symbol_table()); + + auto operator_symbol = symbol_table.CreateSymbol("OPERATOR", false); + auto actual_hits_symbol = symbol_table.CreateSymbol("ACTUAL HITS", false); + auto relative_time_symbol = symbol_table.CreateSymbol("RELATIVE TIME", false); + auto absolute_time_symbol = symbol_table.CreateSymbol("ABSOLUTE TIME", false); + + std::vector output_symbols = {operator_symbol, actual_hits_symbol, + relative_time_symbol, + absolute_time_symbol}; + std::vector header{ + operator_symbol.name(), actual_hits_symbol.name(), + relative_time_symbol.name(), absolute_time_symbol.name()}; + + auto output_plan = std::make_unique( + output_symbols, + [cypher_query_plan](Frame *frame, ExecutionContext *context) { + utils::MonotonicBufferResource execution_memory(1 * 1024 * 1024); + auto cursor = cypher_query_plan->plan().MakeCursor(&execution_memory); + + // We are pulling from another plan, so set up the EvaluationContext + // correctly. The rest of the context should be good for sharing. + context->evaluation_context.properties = NamesToProperties( + cypher_query_plan->ast_storage().properties_, context->db_accessor); + context->evaluation_context.labels = NamesToLabels( + cypher_query_plan->ast_storage().labels_, context->db_accessor); + + // Pull everything to profile the execution + utils::Timer timer; + while (cursor->Pull(*frame, *context)) continue; + auto execution_time = timer.Elapsed(); + + context->profile_execution_time = execution_time; + + return ProfilingStatsToTable(context->stats, execution_time); + }); + + auto plan = + std::make_shared(std::make_unique( + std::move(output_plan), 0.0, AstStorage{}, symbol_table)); + + return PreparedQuery{ + std::move(header), std::move(parsed_query.required_privileges), + [plan = std::move(plan), + parameters = std::move(parsed_inner_query.parameters), + output_symbols = std::move(output_symbols), summary, dba, + execution_memory](AnyStream *stream) { + auto ctx = PullAllPlan(stream, *plan, parameters, output_symbols, true, + summary, dba, execution_memory); + + summary->insert_or_assign( + "profile", + ProfilingStatsToJson(ctx.stats, ctx.profile_execution_time).dump()); + + return false; + }}; +} + +PreparedQuery PrepareDumpQuery( + ParsedQuery parsed_query, std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { +#ifndef MG_SINGLE_NODE_HA + SymbolTable symbol_table; + auto query_symbol = symbol_table.CreateSymbol("QUERY", false); + + std::vector output_symbols = {query_symbol}; + std::vector header = {query_symbol.name()}; + + auto output_plan = std::make_unique( + output_symbols, DumpClosure(dba)); + auto plan = + std::make_shared(std::make_unique( + std::move(output_plan), 0.0, AstStorage{}, symbol_table)); + + return PreparedQuery{ + std::move(header), std::move(parsed_query.required_privileges), + [plan = std::move(plan), parameters = std::move(parsed_query.parameters), + output_symbols = std::move(output_symbols), summary, dba, + execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, false, summary, + dba, execution_memory); + return true; + }}; +#else + throw utils::NotYetImplemented("Dump database"); +#endif +} + +PreparedQuery PrepareIndexQuery( + ParsedQuery parsed_query, bool in_explicit_transaction, + std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { + if (in_explicit_transaction) { + throw IndexInMulticommandTxException(); + } + + auto *index_query = utils::Downcast(parsed_query.query); + + // Creating an index influences computed plan costs. + auto invalidate_plan_cache = [plan_cache = &interpreter_context->plan_cache] { + auto access = plan_cache->access(); + for (auto &kv : access) { + access.remove(kv.first); + } + }; + + auto callback = HandleIndexQuery(index_query, invalidate_plan_cache, dba); + SymbolTable symbol_table; std::vector output_symbols; for (const auto &column : callback.header) { output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false")); } - plan = std::make_shared(std::make_unique( - std::make_unique( - output_symbols, - [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), - 0.0, AstStorage{}, symbol_table)); + auto plan = + std::make_shared(std::make_unique( + std::make_unique( + output_symbols, + [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), + 0.0, AstStorage{}, symbol_table)); - auto planning_time = planning_timer.Elapsed(); - summary["planning_time"] = planning_time.count(); - summary["cost_estimate"] = 0.0; + return PreparedQuery{callback.header, + std::move(parsed_query.required_privileges), + [callback = std::move(callback), plan = std::move(plan), + parameters = std::move(parsed_query.parameters), + output_symbols = std::move(output_symbols), summary, + dba, execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, + false, summary, dba, execution_memory); + return !callback.should_abort_query; + }}; +} - return Results(db_accessor, parsed_query.parameters, plan, - std::move(output_symbols), callback.header, std::move(summary), - parsed_query.required_privileges, &execution_memory_, - /* is_profile_query */ false, callback.should_abort_query); +PreparedQuery PrepareAuthQuery( + ParsedQuery parsed_query, bool in_explicit_transaction, + std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { +#ifdef MG_SINGLE_NODE_HA + throw utils::NotYetImplemented( + "Managing user privileges is not yet supported in Memgraph HA " + "instance."); +#else + if (in_explicit_transaction) { + throw UserModificationInMulticommandTxException(); + } + + auto *auth_query = utils::Downcast(parsed_query.query); + + auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, + parsed_query.parameters, dba); + + SymbolTable symbol_table; + std::vector output_symbols; + for (const auto &column : callback.header) { + output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false")); + } + + auto plan = + std::make_shared(std::make_unique( + std::make_unique( + output_symbols, + [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), + 0.0, AstStorage{}, symbol_table)); + + return PreparedQuery{callback.header, + std::move(parsed_query.required_privileges), + + [callback = std::move(callback), plan = std::move(plan), + parameters = std::move(parsed_query.parameters), + output_symbols = std::move(output_symbols), summary, + dba, execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, + false, summary, dba, execution_memory); + return !callback.should_abort_query; + }}; +#endif +} + +PreparedQuery PrepareInfoQuery( + ParsedQuery parsed_query, std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { + auto *info_query = utils::Downcast(parsed_query.query); + + auto callback = HandleInfoQuery(info_query, dba); + + SymbolTable symbol_table; + std::vector output_symbols; + for (const auto &column : callback.header) { + output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false")); + } + + auto plan = + std::make_shared(std::make_unique( + std::make_unique( + output_symbols, + [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), + 0.0, AstStorage{}, symbol_table)); + + return PreparedQuery{callback.header, + std::move(parsed_query.required_privileges), + [callback = std::move(callback), plan = std::move(plan), + parameters = std::move(parsed_query.parameters), + output_symbols = std::move(output_symbols), summary, + dba, execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, + false, summary, dba, execution_memory); + return !callback.should_abort_query; + }}; +} + +PreparedQuery PrepareConstraintQuery( + ParsedQuery parsed_query, std::map *summary, + InterpreterContext *interpreter_context, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory) { + auto *constraint_query = utils::Downcast(parsed_query.query); + + auto callback = HandleConstraintQuery(constraint_query, dba); + + SymbolTable symbol_table; + std::vector output_symbols; + for (const auto &column : callback.header) { + output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false")); + } + + auto plan = + std::make_shared(std::make_unique( + std::make_unique( + output_symbols, + [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), + 0.0, AstStorage{}, symbol_table)); + + return PreparedQuery{callback.header, + std::move(parsed_query.required_privileges), + [callback = std::move(callback), plan = std::move(plan), + parameters = std::move(parsed_query.parameters), + output_symbols = std::move(output_symbols), summary, + dba, execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, + false, summary, dba, execution_memory); + return !callback.should_abort_query; + }}; +} + +void Interpreter::Prepare(const std::string &query_string, + const std::map ¶ms) { + summary_.clear(); + + // TODO: Set summary['type'] based on transaction metadata. The type can't be + // determined based only on the toplevel logical operator -- for example + // `MATCH DELETE RETURN`, which is a write query, will have `Produce` as its + // toplevel operator). For now we always set "rw" because something must be + // set, but it doesn't have to be correct (for Bolt clients). + summary_["type"] = "rw"; + + // Set a default cost estimate of 0. Individual queries can overwrite this + // field with an improved estimate. + summary_["cost_estimate"] = 0.0; + + utils::Timer parsing_timer; + ParsedQuery parsed_query = + ParseQuery(query_string, params, &interpreter_context_->ast_cache, + &interpreter_context_->antlr_lock); + summary_["parsing_time"] = parsing_timer.Elapsed().count(); + +#ifdef MG_SINGLE_NODE_HA + { + InfoQuery *info_query = nullptr; + if (!execution_db_accessor_->raft()->IsLeader() && + (!(info_query = utils::Downcast(parsed_query.query)) || + info_query->info_type_ != InfoQuery::InfoType::RAFT)) { + throw raft::CantExecuteQueries(); + } + } +#endif + + utils::Timer planning_timer; + PreparedQuery prepared_query; + + if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareCypherQuery( + std::move(parsed_query), &summary_, interpreter_context_, + &*execution_db_accessor_, &execution_memory_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareExplainQuery( + std::move(parsed_query), &summary_, interpreter_context_, + &*execution_db_accessor_, &execution_memory_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareProfileQuery( + std::move(parsed_query), in_explicit_transaction_, &summary_, + interpreter_context_, &*execution_db_accessor_, &execution_memory_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareDumpQuery( + std::move(parsed_query), &summary_, interpreter_context_, + &*execution_db_accessor_, &execution_memory_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareIndexQuery( + std::move(parsed_query), in_explicit_transaction_, &summary_, + interpreter_context_, &*execution_db_accessor_, &execution_memory_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareAuthQuery( + std::move(parsed_query), in_explicit_transaction_, &summary_, + interpreter_context_, &*execution_db_accessor_, &execution_memory_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareInfoQuery( + std::move(parsed_query), &summary_, interpreter_context_, + &*execution_db_accessor_, &execution_memory_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareConstraintQuery( + std::move(parsed_query), &summary_, interpreter_context_, + &*execution_db_accessor_, &execution_memory_); + } else { + LOG(FATAL) << "Should not get here -- unknown query type!"; + } + + summary_["planning_time"] = planning_timer.Elapsed().count(); + prepared_query_ = std::move(prepared_query); } void Interpreter::Abort() { - results_ = std::nullopt; + prepared_query_ = std::nullopt; execution_memory_.Release(); expect_rollback_ = false; in_explicit_transaction_ = false; @@ -1153,7 +1408,7 @@ void Interpreter::Abort() { } void Interpreter::Commit() { - results_ = std::nullopt; + prepared_query_ = std::nullopt; execution_memory_.Release(); if (!db_accessor_) return; #ifdef MG_SINGLE_NODE_V2 @@ -1178,14 +1433,14 @@ void Interpreter::Commit() { } void Interpreter::AdvanceCommand() { - results_ = std::nullopt; + prepared_query_ = std::nullopt; execution_memory_.Release(); if (!db_accessor_) return; db_accessor_->AdvanceCommand(); } void Interpreter::AbortCommand() { - results_ = std::nullopt; + prepared_query_ = std::nullopt; execution_memory_.Release(); if (in_explicit_transaction_) { expect_rollback_ = true; @@ -1194,40 +1449,4 @@ void Interpreter::AbortCommand() { } } -std::shared_ptr Interpreter::CypherQueryToPlan( - HashType query_hash, CypherQuery *query, AstStorage ast_storage, - const Parameters ¶meters, DbAccessor *db_accessor) { - auto plan_cache_access = interpreter_context_->plan_cache.access(); - auto it = plan_cache_access.find(query_hash); - if (it != plan_cache_access.end()) { - if (it->second->IsExpired()) { - plan_cache_access.remove(query_hash); - } else { - return it->second; - } - } - return plan_cache_access - .insert({query_hash, - std::make_shared(MakeLogicalPlan( - query, std::move(ast_storage), parameters, db_accessor))}) - .first->second; -} - -std::unique_ptr Interpreter::MakeLogicalPlan( - CypherQuery *query, AstStorage ast_storage, const Parameters ¶meters, - DbAccessor *db_accessor) { - auto vertex_counts = plan::MakeVertexCountCache(db_accessor); - - auto symbol_table = MakeSymbolTable(query); - - auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, - query, &vertex_counts); - std::unique_ptr root; - double cost; - std::tie(root, cost) = plan::MakeLogicalPlan(&planning_context, parameters, - FLAGS_query_cost_planner); - return std::make_unique( - std::move(root), cost, std::move(ast_storage), std::move(symbol_table)); -} - } // namespace query diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 849d26a48..f17010a50 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -11,7 +11,6 @@ #include "query/frontend/stripped.hpp" #include "query/interpret/frame.hpp" #include "query/plan/operator.hpp" -#include "utils/likely.hpp" #include "utils/memory.hpp" #include "utils/skip_list.hpp" #include "utils/spin_lock.hpp" @@ -29,6 +28,60 @@ namespace query { static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; +/** + * `AnyStream` can wrap *any* type implementing the `Stream` concept into a + * single type. + * + * The type erasure technique is used. The original type which an `AnyStream` + * was constructed from is "erased", as `AnyStream` is not a class template and + * doesn't use the type in any way. Client code can then program just for + * `AnyStream`, rather than using static polymorphism to handle any type + * implementing the `Stream` concept. + */ +class AnyStream final { + public: + template + AnyStream(TStream *stream, utils::MemoryResource *memory_resource) + : content_{utils::Allocator>{memory_resource} + .template new_object>(stream), + [memory_resource](Wrapper *ptr) { + utils::Allocator>{memory_resource} + .template delete_object>( + static_cast *>(ptr)); + }} {} + + void Result(const std::vector &values) { + content_->Result(values); + } + + private: + struct Wrapper { + virtual void Result(const std::vector &values) = 0; + }; + + template + struct GenericWrapper final : public Wrapper { + explicit GenericWrapper(TStream *stream) : stream_{stream} {} + + void Result(const std::vector &values) override { + stream_->Result(values); + } + + TStream *stream_; + }; + + std::unique_ptr> content_; +}; + +/** + * A container for data related to the preparation of a query. + */ +struct PreparedQuery { + std::vector header; + std::vector privileges; + std::function query_handler; +}; + // TODO: Maybe this should move to query/plan/planner. /// Interface for accessing the root operator of a logical plan. class LogicalPlan { @@ -121,10 +174,10 @@ struct InterpreterContext { database::GraphDb *db; #endif - // Antlr has singleton instance that is shared between threads. It is - // protected by locks inside of antlr. Unfortunately, they are not protected - // in a very good way. Once we have antlr version without race conditions we - // can remove this lock. This will probably never happen since antlr + // ANTLR has singleton instance that is shared between threads. It is + // protected by locks inside of ANTLR. Unfortunately, they are not protected + // in a very good way. Once we have ANTLR version without race conditions we + // can remove this lock. This will probably never happen since ANTLR // developers introduce more bugs in each version. Fortunately, we have // cache so this lock probably won't impact performance much... utils::SpinLock antlr_lock; @@ -136,208 +189,58 @@ struct InterpreterContext { utils::SkipList plan_cache; }; -class Interpreter { +class Interpreter final { public: - /** - * Encapsulates all what's necessary for the interpretation of a query - * into a single object that can be pulled (into the given Stream). - */ - class Results { - friend Interpreter; - Results(DbAccessor *db_accessor, const query::Parameters ¶meters, - std::shared_ptr plan, - std::vector output_symbols, std::vector header, - std::map summary, - std::vector privileges, - utils::MemoryResource *execution_memory, - bool is_profile_query = false, bool should_abort_query = false) - : ctx_{db_accessor}, - plan_(plan), - cursor_(plan_->plan().MakeCursor(execution_memory)), - frame_(plan_->symbol_table().max_position(), execution_memory), - output_symbols_(std::move(output_symbols)), - header_(std::move(header)), - summary_(std::move(summary)), - privileges_(std::move(privileges)), - should_abort_query_(should_abort_query) { - ctx_.is_profile_query = is_profile_query; - ctx_.symbol_table = plan_->symbol_table(); - ctx_.evaluation_context.timestamp = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - ctx_.evaluation_context.parameters = parameters; - ctx_.evaluation_context.properties = - NamesToProperties(plan_->ast_storage().properties_, db_accessor); - ctx_.evaluation_context.labels = - NamesToLabels(plan_->ast_storage().labels_, db_accessor); - } - - public: - Results(const Results &) = delete; - Results(Results &&) = default; - Results &operator=(const Results &) = delete; - Results &operator=(Results &&) = default; - - /** - * Make the interpreter perform a single Pull. Results (if they exists) are - * pushed into the given stream. On first Pull the header is written to the - * stream, on last the summary. - * - * @param stream - The stream to push the header, results and summary into. - * @return - If this Results is eligible for another Pull. If Pulling - * after `false` has been returned, the behavior is undefined. - * @tparam TStream - Stream type. - */ - template - bool Pull(TStream &stream) { - utils::Timer timer; - // Setup temporary memory for a single Pull. Initial memory should come - // from stack, 256 KiB should fit on the stack and should be more than - // enough for a single Pull. - constexpr size_t stack_size = 256 * 1024; - char stack_data[stack_size]; - utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size); - // TODO (mferencevic): Tune the parameters accordingly. - utils::PoolResource pool_memory(128, 1024, &monotonic_memory); - ctx_.evaluation_context.memory = &pool_memory; - // We can now Pull a result. - bool return_value = cursor_->Pull(frame_, ctx_); - if (return_value && !output_symbols_.empty()) { - // TODO: The streamed values should also probably use the above memory. - std::vector values; - values.reserve(output_symbols_.size()); - for (const auto &symbol : output_symbols_) { - values.emplace_back(frame_[symbol]); - } - stream.Result(values); - } - execution_time_ += timer.Elapsed().count(); - - if (!return_value) { - summary_["plan_execution_time"] = execution_time_; - - if (ctx_.is_profile_query) { - summary_["profile"] = - ProfilingStatsToJson(ctx_.stats, ctx_.profile_execution_time) - .dump(); - } - - cursor_->Shutdown(); - } - - return return_value; - } - - /** Calls Pull() until exhausted. */ - template - void PullAll(TStream &stream) { - while (Pull(stream)) continue; - } - - const std::vector &header() const & { return header_; } - std::vector &&header() && { return std::move(header_); } - const std::map &summary() const & { - return summary_; - } - std::map &&summary() && { - return std::move(summary_); - } - - const std::vector &privileges() { - return privileges_; - } - - bool ShouldAbortQuery() const { return should_abort_query_; } - - private: - ExecutionContext ctx_; - std::shared_ptr plan_; - query::plan::UniqueCursorPtr cursor_; - Frame frame_; - std::vector output_symbols_; - - std::vector header_; - std::map summary_; - - double execution_time_{0}; - - std::vector privileges_; - - bool should_abort_query_; - }; - explicit Interpreter(InterpreterContext *interpreter_context); Interpreter(const Interpreter &) = delete; Interpreter &operator=(const Interpreter &) = delete; Interpreter(Interpreter &&) = delete; Interpreter &operator=(Interpreter &&) = delete; - - virtual ~Interpreter() { Abort(); } + ~Interpreter() { Abort(); } std::pair, std::vector> Interpret(const std::string &query, const std::map ¶ms); /** - * Generates an Results object for the parameters. The resulting object - * can be Pulled with its results written to an arbitrary stream. + * Prepare a query for execution. + * + * To prepare a query for execution means to preprocess the query and adjust + * the state of the `Interpreter` in such a way so that the next call to + * `PullAll` executes the query. + * + * @throw raft::CantExecuteQueries if the Memgraph instance is not a Raft + * leader and a query other than an Info Raft query was given + * @throw query::QueryException */ - virtual Results Prepare(const std::string &query, - const std::map ¶ms, - DbAccessor *db_accessor); + void Prepare(const std::string &query, + const std::map ¶ms); + /** + * Execute the last prepared query and stream *all* of the results into the + * given stream. + * + * TStream should be a type implementing the `Stream` concept, i.e. it should + * contain the member function `void Result(const std::vector &)`. + * The provided vector argument is valid only for the duration of the call to + * `Result`. The stream should make an explicit copy if it wants to use it + * further. + * + * @throw utils::BasicException + * @throw query::QueryException + */ template - std::map PullAll(TStream *result_stream) { - // If we don't have any results (eg. a transaction command preceeded), - // return an empty summary. - if (UNLIKELY(!results_)) return {}; - - // Stream all results and return the summary. - try { - results_->PullAll(*result_stream); - // Make a copy of the summary because the `Commit` call will destroy the - // `results_` object. - auto summary = results_->summary(); - if (!in_explicit_transaction_) { - if (results_->ShouldAbortQuery()) { - Abort(); - } else { - Commit(); - } - } - - return summary; -#ifdef MG_SINGLE_NODE_HA - } catch (const query::HintedAbortError &) { - AbortCommand(); - throw utils::BasicException("Transaction was asked to abort."); -#endif - } catch (const utils::BasicException &) { - AbortCommand(); - throw; - } - } + std::map PullAll(TStream *result_stream); + /** + * Abort the current multicommand transaction. + */ void Abort(); - protected: - // high level tree -> logical plan - // AstStorage and SymbolTable may be modified during planning. The created - // LogicalPlan must take ownership of AstStorage and SymbolTable. - virtual std::unique_ptr MakeLogicalPlan(CypherQuery *, - AstStorage, - const Parameters &, - DbAccessor *); - - virtual void PrettyPrintPlan(const DbAccessor &, - const plan::LogicalOperator *, std::ostream *); - - virtual std::string PlanToJson(const DbAccessor &, - const plan::LogicalOperator *); - private: InterpreterContext *interpreter_context_; + std::optional prepared_query_; + std::map summary_; #ifdef MG_SINGLE_NODE_V2 std::optional db_accessor_; @@ -345,11 +248,6 @@ class Interpreter { std::optional db_accessor_; #endif std::optional execution_db_accessor_; - // The `query::Interpreter::Results` object MUST be destroyed before the - // `database::GraphDbAccessor` is destroyed because the `Results` object holds - // references to the `GraphDb` object and will crash the database when - // destructed if you are not careful. - std::optional results_; bool in_explicit_transaction_{false}; bool expect_rollback_{false}; utils::MonotonicBufferResource execution_memory_{kExecutionMemoryBlockSize}; @@ -357,13 +255,38 @@ class Interpreter { void Commit(); void AdvanceCommand(); void AbortCommand(); - - // high level tree -> CachedPlan - std::shared_ptr CypherQueryToPlan(HashType query_hash, - CypherQuery *query, - AstStorage ast_storage, - const Parameters ¶meters, - DbAccessor *db_accessor); }; +template +std::map Interpreter::PullAll(TStream *result_stream) { + // If we don't have any results (eg. a transaction command preceeded), + // return an empty summary. + if (!prepared_query_) return {}; + + try { + // Wrap the (statically polymorphic) stream type into a common type which + // the handler knows. + AnyStream stream{result_stream, &execution_memory_}; + bool commit = prepared_query_->query_handler(&stream); + + if (!in_explicit_transaction_) { + if (commit) { + Commit(); + } else { + Abort(); + } + } + + return summary_; +#ifdef MG_SINGLE_NODE_HA + } catch (const query::HintedAbortError &) { + AbortCommand(); + throw utils::BasicException("Transaction was asked to abort."); +#endif + } catch (const utils::BasicException &) { + AbortCommand(); + throw; + } +} + } // namespace query