#pragma once #include #include "data_structures/concurrent/concurrent_map.hpp" #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "query/context.hpp" #include "query/frontend/ast/ast.hpp" #include "query/frontend/stripped.hpp" #include "query/interpret/frame.hpp" #include "query/plan/operator.hpp" #include "utils/thread/sync.hpp" #include "utils/timer.hpp" DECLARE_bool(query_cost_planner); DECLARE_int32(query_plan_cache_ttl); namespace auth { class Auth; } // namespace auth namespace integrations::kafka { class Streams; } // namespace integrations::kafka namespace query { // TODO: Maybe this should move to query/plan/planner. /// Interface for accessing the root operator of a logical plan. class LogicalPlan { public: virtual ~LogicalPlan() {} virtual const plan::LogicalOperator &GetRoot() const = 0; virtual double GetCost() const = 0; virtual const SymbolTable &GetSymbolTable() const = 0; }; class Interpreter { private: class CachedPlan { public: CachedPlan(std::unique_ptr plan); const auto &plan() const { return plan_->GetRoot(); } double cost() const { return plan_->GetCost(); } const auto &symbol_table() const { return plan_->GetSymbolTable(); } bool IsExpired() const { return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl); }; private: std::unique_ptr plan_; utils::Timer cache_timer_; }; struct CachedQuery { AstStorage ast_storage; Query *query; std::vector required_privileges; }; struct ParsedQuery { Query *query; std::vector required_privileges; }; using PlanCacheT = ConcurrentMap>; public: /** * Encapsulates all what's necessary for the interpretation of a query * into a single object that can be pulled (into the given Stream). */ class Results { friend Interpreter; Results(Context ctx, std::shared_ptr plan, std::unique_ptr cursor, std::vector output_symbols, std::vector header, std::map summary, std::vector privileges) : ctx_(std::move(ctx)), plan_(plan), cursor_(std::move(cursor)), frame_(ctx_.symbol_table_.max_position()), output_symbols_(output_symbols), header_(header), summary_(summary), privileges_(std::move(privileges)) {} public: Results(const Results &) = delete; Results(Results &&) = default; Results &operator=(const Results &) = delete; Results &operator=(Results &&) = default; /** * Make the interpreter perform a single Pull. Results (if they exists) are * pushed into the given stream. On first Pull the header is written to the * stream, on last the summary. * * @param stream - The stream to push the header, results and summary into. * @return - If this Results is eligible for another Pull. If Pulling * after `false` has been returned, the behavior is undefined. * @tparam TStream - Stream type. */ template bool Pull(TStream &stream) { utils::Timer timer; bool return_value = cursor_->Pull(frame_, ctx_); if (return_value && !output_symbols_.empty()) { std::vector values; values.reserve(output_symbols_.size()); for (const auto &symbol : output_symbols_) { values.emplace_back(frame_[symbol]); } stream.Result(values); } execution_time_ += timer.Elapsed().count(); if (!return_value) { summary_["plan_execution_time"] = execution_time_; cursor_->Shutdown(); } return return_value; } /** Calls Pull() until exhausted. */ template void PullAll(TStream &stream) { while (Pull(stream)) continue; } const std::vector &header() { return header_; } const std::map &summary() { return summary_; } const std::vector &privileges() { return privileges_; } private: Context ctx_; std::shared_ptr plan_; std::unique_ptr cursor_; Frame frame_; std::vector output_symbols_; std::vector header_; std::map summary_; double execution_time_{0}; std::vector privileges_; }; Interpreter() = default; Interpreter(const Interpreter &) = delete; Interpreter &operator=(const Interpreter &) = delete; Interpreter(Interpreter &&) = delete; Interpreter &operator=(Interpreter &&) = delete; virtual ~Interpreter() {} /** * Generates an Results object for the parameters. The resulting object * can be Pulled with its results written to an arbitrary stream. */ Results operator()(const std::string &query, database::GraphDbAccessor &db_accessor, const std::map ¶ms, bool in_explicit_transaction); auth::Auth *auth_ = nullptr; integrations::kafka::Streams *kafka_streams_ = nullptr; protected: // high level tree -> logical plan // AstStorage and SymbolTable may be modified during planning. The created // LogicalPlan must take ownership of AstStorage and SymbolTable. virtual std::unique_ptr MakeLogicalPlan( CypherQuery *, AstStorage, const Parameters &, database::GraphDbAccessor *); virtual void PrettyPrintPlan(const database::GraphDbAccessor &, const plan::LogicalOperator *, std::ostream *); private: ConcurrentMap ast_cache_; PlanCacheT plan_cache_; // Antlr has singleton instance that is shared between threads. It is // protected by locks inside of antlr. Unfortunately, they are not protected // in a very good way. Once we have antlr version without race conditions we // can remove this lock. This will probably never happen since antlr // developers introduce more bugs in each version. Fortunately, we have cache // so this lock probably won't impact performance much... utils::SpinLock antlr_lock_; // high level tree -> CachedPlan std::shared_ptr CypherQueryToPlan( HashType query_hash, CypherQuery *query, AstStorage ast_storage, const Parameters ¶meters, database::GraphDbAccessor *db_accessor); // stripped query -> high level tree ParsedQuery ParseQuery(const std::string &stripped_query, const std::string &original_query, const ParsingContext &context, AstStorage *ast_storage, database::GraphDbAccessor *db_accessor); }; } // namespace query