diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index d68774818..8ebe31425 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -9,6 +9,7 @@ add_custom_target(generate_lcp_query DEPENDS ${generated_lcp_query_files}) set(mg_query_sources ${lcp_query_cpp_files} common.cpp + cypher_query_interpreter.cpp dump.cpp frontend/ast/cypher_main_visitor.cpp frontend/ast/pretty_print.cpp @@ -30,6 +31,7 @@ set(mg_query_sources procedure/mg_procedure_impl.cpp procedure/module.cpp procedure/py_module.cpp + trigger.cpp typed_value.cpp) add_library(mg-query STATIC ${mg_query_sources}) diff --git a/src/query/cypher_query_interpreter.cpp b/src/query/cypher_query_interpreter.cpp new file mode 100644 index 000000000..d721f2c09 --- /dev/null +++ b/src/query/cypher_query_interpreter.cpp @@ -0,0 +1,135 @@ +#include "query/cypher_query_interpreter.hpp" + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner."); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.", + FLAG_IN_RANGE(0, std::numeric_limits::max())); + +namespace query { +CachedPlan::CachedPlan(std::unique_ptr plan) : plan_(std::move(plan)) {} + +ParsedQuery ParseQuery(const std::string &query_string, const std::map ¶ms, + utils::SkipList *cache, utils::SpinLock *antlr_lock) { + // Strip the query for caching purposes. The process of stripping a query + // "normalizes" it by replacing any literals with new parameters. This + // results in just the *structure* of the query being taken into account for + // caching. + frontend::StrippedQuery stripped_query{query_string}; + + // Copy over the parameters that were introduced during stripping. + Parameters parameters{stripped_query.literals()}; + + // Check that all user-specified parameters are provided. + for (const auto ¶m_pair : stripped_query.parameters()) { + auto it = params.find(param_pair.second); + + if (it == params.end()) { + throw query::UnprovidedParameterError("Parameter ${} not provided.", param_pair.second); + } + + parameters.Add(param_pair.first, it->second); + } + + // Cache the query's AST if it isn't already. + auto hash = stripped_query.hash(); + auto accessor = cache->access(); + auto it = accessor.find(hash); + std::unique_ptr parser; + + // Return a copy of both the AST storage and the query. + CachedQuery result; + bool is_cacheable = true; + + auto get_information_from_cache = [&](const auto &cached_query) { + result.ast_storage.properties_ = cached_query.ast_storage.properties_; + result.ast_storage.labels_ = cached_query.ast_storage.labels_; + result.ast_storage.edge_types_ = cached_query.ast_storage.edge_types_; + + result.query = cached_query.query->Clone(&result.ast_storage); + result.required_privileges = cached_query.required_privileges; + }; + + if (it == accessor.end()) { + { + std::unique_lock guard(*antlr_lock); + + try { + parser = std::make_unique(stripped_query.query()); + } catch (const SyntaxException &e) { + // There is a syntax exception in the stripped query. Re-run the parser + // on the original query to get an appropriate error messsage. + parser = std::make_unique(query_string); + + // If an exception was not thrown here, the stripper messed something + // up. + LOG_FATAL("The stripped query can't be parsed, but the original can."); + } + } + + // Convert the ANTLR4 parse tree into an AST. + AstStorage ast_storage; + frontend::ParsingContext context{true}; + frontend::CypherMainVisitor visitor(context, &ast_storage); + + visitor.visit(parser->tree()); + + if (visitor.IsCacheable()) { + CachedQuery cached_query{std::move(ast_storage), visitor.query(), query::GetRequiredPrivileges(visitor.query())}; + it = accessor.insert({hash, std::move(cached_query)}).first; + + get_information_from_cache(it->second); + } else { + result.ast_storage.properties_ = ast_storage.properties_; + result.ast_storage.labels_ = ast_storage.labels_; + result.ast_storage.edge_types_ = ast_storage.edge_types_; + + result.query = visitor.query()->Clone(&result.ast_storage); + result.required_privileges = query::GetRequiredPrivileges(visitor.query()); + + is_cacheable = false; + } + } else { + get_information_from_cache(it->second); + } + + return ParsedQuery{query_string, + params, + std::move(parameters), + std::move(stripped_query), + std::move(result.ast_storage), + result.query, + std::move(result.required_privileges), + is_cacheable}; +} + +std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, + DbAccessor *db_accessor) { + auto vertex_counts = plan::MakeVertexCountCache(db_accessor); + auto symbol_table = MakeSymbolTable(query); + auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts); + auto [root, cost] = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_cost_planner); + return std::make_unique(std::move(root), cost, std::move(ast_storage), + std::move(symbol_table)); +} + +std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, + const Parameters ¶meters, utils::SkipList *plan_cache, + DbAccessor *db_accessor, const bool is_cacheable) { + auto plan_cache_access = plan_cache->access(); + auto it = plan_cache_access.find(hash); + if (it != plan_cache_access.end()) { + if (it->second->IsExpired()) { + plan_cache_access.remove(hash); + } else { + return it->second; + } + } + + auto plan = std::make_shared(MakeLogicalPlan(std::move(ast_storage), (query), parameters, db_accessor)); + if (is_cacheable) { + plan_cache_access.insert({hash, plan}); + } + return plan; +} +} // namespace query diff --git a/src/query/cypher_query_interpreter.hpp b/src/query/cypher_query_interpreter.hpp new file mode 100644 index 000000000..eaf688521 --- /dev/null +++ b/src/query/cypher_query_interpreter.hpp @@ -0,0 +1,149 @@ +#pragma once + +////////////////////////////////////////////////////// +// THIS INCLUDE SHOULD ALWAYS COME BEFORE THE +// "cypher_main_visitor.hpp" +// "planner.hpp" includes json.hpp which uses libc's +// EOF macro while "cypher_main_visitor.hpp" includes +// "antlr4-runtime.h" which contains a static variable +// of the same name, EOF. +// This hides the definition of the macro which causes +// the compilation to fail. +#include "query/plan/planner.hpp" +////////////////////////////////////////////////////// +#include "query/frontend/ast/cypher_main_visitor.hpp" +#include "query/frontend/opencypher/parser.hpp" +#include "query/frontend/semantic/required_privileges.hpp" +#include "query/frontend/semantic/symbol_generator.hpp" +#include "query/frontend/stripped.hpp" +#include "utils/flag_validation.hpp" +#include "utils/timer.hpp" + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(query_cost_planner); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int32(query_plan_cache_ttl); + +namespace query { + +// TODO: Maybe this should move to query/plan/planner. +/// Interface for accessing the root operator of a logical plan. +class LogicalPlan { + public: + explicit LogicalPlan() = default; + + virtual ~LogicalPlan() = default; + + LogicalPlan(const LogicalPlan &) = default; + LogicalPlan &operator=(const LogicalPlan &) = default; + LogicalPlan(LogicalPlan &&) = default; + LogicalPlan &operator=(LogicalPlan &&) = default; + + virtual const plan::LogicalOperator &GetRoot() const = 0; + virtual double GetCost() const = 0; + virtual const SymbolTable &GetSymbolTable() const = 0; + virtual const AstStorage &GetAstStorage() const = 0; +}; + +class CachedPlan { + public: + explicit CachedPlan(std::unique_ptr plan); + + const auto &plan() const { return plan_->GetRoot(); } + double cost() const { return plan_->GetCost(); } + const auto &symbol_table() const { return plan_->GetSymbolTable(); } + const auto &ast_storage() const { return plan_->GetAstStorage(); } + + bool IsExpired() const { + // NOLINTNEXTLINE (modernize-use-nullptr) + return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl); + }; + + private: + std::unique_ptr plan_; + utils::Timer cache_timer_; +}; + +struct CachedQuery { + AstStorage ast_storage; + Query *query; + std::vector required_privileges; +}; + +struct QueryCacheEntry { + bool operator==(const QueryCacheEntry &other) const { return first == other.first; } + bool operator<(const QueryCacheEntry &other) const { return first < other.first; } + bool operator==(const uint64_t &other) const { return first == other; } + bool operator<(const uint64_t &other) const { return first < other; } + + uint64_t first; + // TODO: Maybe store the query string here and use it as a key with the hash + // so that we eliminate the risk of hash collisions. + CachedQuery second; +}; + +struct PlanCacheEntry { + bool operator==(const PlanCacheEntry &other) const { return first == other.first; } + bool operator<(const PlanCacheEntry &other) const { return first < other.first; } + bool operator==(const uint64_t &other) const { return first == other; } + bool operator<(const uint64_t &other) const { return first < other; } + + uint64_t first; + // TODO: Maybe store the query string here and use it as a key with the hash + // so that we eliminate the risk of hash collisions. + std::shared_ptr second; +}; + +/** + * A container for data related to the parsing of a query. + */ +struct ParsedQuery { + std::string query_string; + std::map user_parameters; + Parameters parameters; + frontend::StrippedQuery stripped_query; + AstStorage ast_storage; + Query *query; + std::vector required_privileges; + bool is_cacheable{true}; +}; + +ParsedQuery ParseQuery(const std::string &query_string, const std::map ¶ms, + utils::SkipList *cache, utils::SpinLock *antlr_lock); + +class SingleNodeLogicalPlan final : public LogicalPlan { + public: + SingleNodeLogicalPlan(std::unique_ptr root, double cost, AstStorage storage, + const SymbolTable &symbol_table) + : root_(std::move(root)), cost_(cost), storage_(std::move(storage)), symbol_table_(symbol_table) {} + + const plan::LogicalOperator &GetRoot() const override { return *root_; } + double GetCost() const override { return cost_; } + const SymbolTable &GetSymbolTable() const override { return symbol_table_; } + const AstStorage &GetAstStorage() const override { return storage_; } + + private: + std::unique_ptr root_; + double cost_; + AstStorage storage_; + SymbolTable symbol_table_; +}; + +/** + * Convert a parsed *Cypher* query's AST into a logical plan. + * + * The created logical plan will take ownership of the `AstStorage` within + * `ParsedQuery` and might modify it during planning. + */ +std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, + DbAccessor *db_accessor); + +/** + * Return the parsed *Cypher* query's AST cached logical plan, or create and + * cache a fresh one if it doesn't yet exist. + */ +std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, + const Parameters ¶meters, utils::SkipList *plan_cache, + DbAccessor *db_accessor, bool is_cacheable = true); + +} // namespace query diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 91a070507..2c5ff75ed 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -31,10 +31,6 @@ #include "utils/string.hpp" #include "utils/tsc.hpp" -DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner."); -DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.", - FLAG_IN_RANGE(0, std::numeric_limits::max())); - namespace EventCounter { extern Event ReadQuery; extern Event WriteQuery; @@ -62,135 +58,6 @@ void UpdateTypeCount(const plan::ReadWriteTypeChecker::RWType type) { break; } } -} // namespace - -/** - * A container for data related to the parsing of a query. - */ -struct ParsedQuery { - std::string query_string; - std::map user_parameters; - Parameters parameters; - frontend::StrippedQuery stripped_query; - AstStorage ast_storage; - Query *query; - std::vector required_privileges; - bool is_cacheable{true}; -}; - -ParsedQuery ParseQuery(const std::string &query_string, const std::map ¶ms, - utils::SkipList *cache, utils::SpinLock *antlr_lock) { - // Strip the query for caching purposes. The process of stripping a query - // "normalizes" it by replacing any literals with new parameters. This - // results in just the *structure* of the query being taken into account for - // caching. - frontend::StrippedQuery stripped_query{query_string}; - - // Copy over the parameters that were introduced during stripping. - Parameters parameters{stripped_query.literals()}; - - // Check that all user-specified parameters are provided. - for (const auto ¶m_pair : stripped_query.parameters()) { - auto it = params.find(param_pair.second); - - if (it == params.end()) { - throw query::UnprovidedParameterError("Parameter ${} not provided.", param_pair.second); - } - - parameters.Add(param_pair.first, it->second); - } - - // Cache the query's AST if it isn't already. - auto hash = stripped_query.hash(); - auto accessor = cache->access(); - auto it = accessor.find(hash); - std::unique_ptr parser; - - // Return a copy of both the AST storage and the query. - CachedQuery result; - bool is_cacheable = true; - - auto get_information_from_cache = [&](const auto &cached_query) { - result.ast_storage.properties_ = cached_query.ast_storage.properties_; - result.ast_storage.labels_ = cached_query.ast_storage.labels_; - result.ast_storage.edge_types_ = cached_query.ast_storage.edge_types_; - - result.query = cached_query.query->Clone(&result.ast_storage); - result.required_privileges = cached_query.required_privileges; - }; - - if (it == accessor.end()) { - { - std::unique_lock guard(*antlr_lock); - - try { - parser = std::make_unique(stripped_query.query()); - } catch (const SyntaxException &e) { - // There is a syntax exception in the stripped query. Re-run the parser - // on the original query to get an appropriate error messsage. - parser = std::make_unique(query_string); - - // If an exception was not thrown here, the stripper messed something - // up. - LOG_FATAL("The stripped query can't be parsed, but the original can."); - } - } - - // Convert the ANTLR4 parse tree into an AST. - AstStorage ast_storage; - frontend::ParsingContext context{true}; - frontend::CypherMainVisitor visitor(context, &ast_storage); - - visitor.visit(parser->tree()); - - if (visitor.IsCacheable()) { - CachedQuery cached_query{std::move(ast_storage), visitor.query(), query::GetRequiredPrivileges(visitor.query())}; - it = accessor.insert({hash, std::move(cached_query)}).first; - - get_information_from_cache(it->second); - } else { - result.ast_storage.properties_ = ast_storage.properties_; - result.ast_storage.labels_ = ast_storage.labels_; - result.ast_storage.edge_types_ = ast_storage.edge_types_; - - result.query = visitor.query()->Clone(&result.ast_storage); - result.required_privileges = query::GetRequiredPrivileges(visitor.query()); - - is_cacheable = false; - } - } else { - get_information_from_cache(it->second); - } - - return ParsedQuery{query_string, - params, - std::move(parameters), - std::move(stripped_query), - std::move(result.ast_storage), - result.query, - std::move(result.required_privileges), - is_cacheable}; -} - -class SingleNodeLogicalPlan final : public LogicalPlan { - public: - SingleNodeLogicalPlan(std::unique_ptr root, double cost, AstStorage storage, - const SymbolTable &symbol_table) - : root_(std::move(root)), cost_(cost), storage_(std::move(storage)), symbol_table_(symbol_table) {} - - const plan::LogicalOperator &GetRoot() const override { return *root_; } - double GetCost() const override { return cost_; } - const SymbolTable &GetSymbolTable() const override { return symbol_table_; } - const AstStorage &GetAstStorage() const override { return storage_; } - - private: - std::unique_ptr root_; - double cost_; - AstStorage storage_; - SymbolTable symbol_table_; -}; - -CachedPlan::CachedPlan(std::unique_ptr plan) : plan_(std::move(plan)) {} struct Callback { std::vector header; @@ -575,11 +442,6 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler * } } -Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { - MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); -} - -namespace { // Struct for lazy pulling from a vector struct PullPlanVector { explicit PullPlanVector(std::vector> values) : values_(std::move(values)) {} @@ -730,52 +592,13 @@ std::optional PullPlan::Pull(AnyStream *stream, std::optional< return ctx_; } +using RWType = plan::ReadWriteTypeChecker::RWType; } // namespace -/** - * Convert a parsed *Cypher* query's AST into a logical plan. - * - * The created logical plan will take ownership of the `AstStorage` within - * `ParsedQuery` and might modify it during planning. - */ -std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - DbAccessor *db_accessor) { - auto vertex_counts = plan::MakeVertexCountCache(db_accessor); - auto symbol_table = MakeSymbolTable(query); - auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts); - std::unique_ptr root; - double cost; - std::tie(root, cost) = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_cost_planner); - return std::make_unique(std::move(root), cost, std::move(ast_storage), - std::move(symbol_table)); +Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { + MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); } -/** - * Return the parsed *Cypher* query's AST cached logical plan, or create and - * cache a fresh one if it doesn't yet exist. - */ -std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, - const Parameters ¶meters, utils::SkipList *plan_cache, - DbAccessor *db_accessor, const bool is_cacheable = true) { - auto plan_cache_access = plan_cache->access(); - auto it = plan_cache_access.find(hash); - if (it != plan_cache_access.end()) { - if (it->second->IsExpired()) { - plan_cache_access.remove(hash); - } else { - return it->second; - } - } - - auto plan = std::make_shared(MakeLogicalPlan(std::move(ast_storage), (query), parameters, db_accessor)); - if (is_cacheable) { - plan_cache_access.insert({hash, plan}); - } - return plan; -} - -using RWType = plan::ReadWriteTypeChecker::RWType; - PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) { std::function handler; @@ -1585,6 +1408,7 @@ void Interpreter::Commit() { // We should document clearly that all results should be pulled to complete // a query. if (!db_accessor_) return; + auto maybe_constraint_violation = db_accessor_->Commit(); if (maybe_constraint_violation.HasError()) { const auto &constraint_violation = maybe_constraint_violation.GetError(); @@ -1613,6 +1437,15 @@ void Interpreter::Commit() { } } } + + // Run the triggers + for (const auto &trigger : interpreter_context_->triggers.access()) { + utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; + trigger.Execute(&interpreter_context_->plan_cache, &*execution_db_accessor_, &execution_memory, + *interpreter_context_->tsc_frequency, interpreter_context_->execution_timeout_sec, + &interpreter_context_->is_shutting_down); + } + execution_db_accessor_ = std::nullopt; db_accessor_ = std::nullopt; } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 171691e03..a3ed8e323 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -3,6 +3,7 @@ #include #include "query/context.hpp" +#include "query/cypher_query_interpreter.hpp" #include "query/db_accessor.hpp" #include "query/exceptions.hpp" #include "query/frontend/ast/ast.hpp" @@ -12,6 +13,7 @@ #include "query/plan/operator.hpp" #include "query/plan/read_write_type_checker.hpp" #include "query/stream.hpp" +#include "query/trigger.hpp" #include "query/typed_value.hpp" #include "utils/event_counter.hpp" #include "utils/logging.hpp" @@ -21,9 +23,6 @@ #include "utils/timer.hpp" #include "utils/tsc.hpp" -DECLARE_bool(query_cost_planner); -DECLARE_int32(query_plan_cache_ttl); - namespace EventCounter { extern const Event FailedQuery; } // namespace EventCounter @@ -139,64 +138,6 @@ struct PreparedQuery { plan::ReadWriteTypeChecker::RWType rw_type; }; -// TODO: Maybe this should move to query/plan/planner. -/// Interface for accessing the root operator of a logical plan. -class LogicalPlan { - public: - virtual ~LogicalPlan() {} - - virtual const plan::LogicalOperator &GetRoot() const = 0; - virtual double GetCost() const = 0; - virtual const SymbolTable &GetSymbolTable() const = 0; - virtual const AstStorage &GetAstStorage() const = 0; -}; - -class CachedPlan { - public: - explicit CachedPlan(std::unique_ptr plan); - - const auto &plan() const { return plan_->GetRoot(); } - double cost() const { return plan_->GetCost(); } - const auto &symbol_table() const { return plan_->GetSymbolTable(); } - const auto &ast_storage() const { return plan_->GetAstStorage(); } - - bool IsExpired() const { return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl); }; - - private: - std::unique_ptr plan_; - utils::Timer cache_timer_; -}; - -struct CachedQuery { - AstStorage ast_storage; - Query *query; - std::vector required_privileges; -}; - -struct QueryCacheEntry { - bool operator==(const QueryCacheEntry &other) const { return first == other.first; } - bool operator<(const QueryCacheEntry &other) const { return first < other.first; } - bool operator==(const uint64_t &other) const { return first == other; } - bool operator<(const uint64_t &other) const { return first < other; } - - uint64_t first; - // TODO: Maybe store the query string here and use it as a key with the hash - // so that we eliminate the risk of hash collisions. - CachedQuery second; -}; - -struct PlanCacheEntry { - bool operator==(const PlanCacheEntry &other) const { return first == other.first; } - bool operator<(const PlanCacheEntry &other) const { return first < other.first; } - bool operator==(const uint64_t &other) const { return first == other; } - bool operator<(const uint64_t &other) const { return first < other; } - - uint64_t first; - // TODO: Maybe store the query string here and use it as a key with the hash - // so that we eliminate the risk of hash collisions. - std::shared_ptr second; -}; - /** * Holds data shared between multiple `Interpreter` instances (which might be * running concurrently). @@ -205,7 +146,10 @@ struct PlanCacheEntry { * been passed to an `Interpreter` instance. */ struct InterpreterContext { - explicit InterpreterContext(storage::Storage *db) : db(db) {} + explicit InterpreterContext(storage::Storage *db) : db(db) { + // auto triggers_acc = triggers.access(); + // triggers_acc.insert(Trigger{"Creator", "CREATE (:CREATED)", &ast_cache, &antlr_lock}); + } storage::Storage *db; @@ -225,6 +169,9 @@ struct InterpreterContext { utils::SkipList ast_cache; utils::SkipList plan_cache; + + // use a thread safe container + utils::SkipList triggers; }; /// Function that is used to tell all active interpreters that they should stop diff --git a/src/query/trigger.cpp b/src/query/trigger.cpp new file mode 100644 index 000000000..03b86ff0f --- /dev/null +++ b/src/query/trigger.cpp @@ -0,0 +1,63 @@ +#include "query/trigger.hpp" +#include "query/context.hpp" +#include "query/db_accessor.hpp" +#include "query/frontend/ast/ast.hpp" +#include "query/interpret/frame.hpp" +#include "utils/memory.hpp" + +namespace query { +Trigger::Trigger(std::string name, std::string query, utils::SkipList *cache, + utils::SpinLock *antlr_lock) + : name_(std::move(name)), + parsed_statements_{ParseQuery(query, {} /* this should contain the predefined parameters */, cache, antlr_lock)} { +} + +void Trigger::Execute(utils::SkipList *plan_cache, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory, const double tsc_frequency, + const double max_execution_time_sec, std::atomic *is_shutting_down) const { + AstStorage ast_storage; + ast_storage.properties_ = parsed_statements_.ast_storage.properties_; + ast_storage.labels_ = parsed_statements_.ast_storage.labels_; + ast_storage.edge_types_ = parsed_statements_.ast_storage.edge_types_; + + auto plan = CypherQueryToPlan(parsed_statements_.stripped_query.hash(), std::move(ast_storage), + utils::Downcast(parsed_statements_.query), parsed_statements_.parameters, + plan_cache, dba, parsed_statements_.is_cacheable); + ExecutionContext ctx; + ctx.db_accessor = dba; + ctx.symbol_table = plan->symbol_table(); + ctx.evaluation_context.timestamp = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + ctx.evaluation_context.parameters = parsed_statements_.parameters; + ctx.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba); + ctx.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba); + ctx.execution_tsc_timer = utils::TSCTimer(tsc_frequency); + ctx.max_execution_time_sec = max_execution_time_sec; + ctx.is_shutting_down = is_shutting_down; + ctx.is_profile_query = false; + + // Set up temporary memory for a single Pull. Initial memory comes from the + // stack. 256 KiB should fit on the stack and should be more than enough for a + // single `Pull`. + constexpr size_t stack_size = 256 * 1024; + char stack_data[stack_size]; + + // We can throw on every query because a simple queries for deleting will use only + // the stack allocated buffer. + // Also, we want to throw only when the query engine requests more memory and not the storage + // so we add the exception to the allocator. + utils::ResourceWithOutOfMemoryException resource_with_exception; + utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception); + // TODO (mferencevic): Tune the parameters accordingly. + utils::PoolResource pool_memory(128, 1024, &monotonic_memory); + ctx.evaluation_context.memory = &pool_memory; + + auto cursor = plan->plan().MakeCursor(execution_memory); + Frame frame{plan->symbol_table().max_position(), execution_memory}; + while (cursor->Pull(frame, ctx)) + ; + + cursor->Shutdown(); +} +} // namespace query diff --git a/src/query/trigger.hpp b/src/query/trigger.hpp new file mode 100644 index 000000000..8e15b38cb --- /dev/null +++ b/src/query/trigger.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include "query/cypher_query_interpreter.hpp" +#include "query/frontend/ast/ast.hpp" + +namespace query { +struct Trigger { + explicit Trigger(std::string name, std::string query, utils::SkipList *cache, + utils::SpinLock *antlr_lock); + + void Execute(utils::SkipList *plan_cache, DbAccessor *dba, + utils::MonotonicBufferResource *execution_memory, double tsc_frequency, double max_execution_time_sec, + std::atomic *is_shutting_down) const; + + bool operator==(const Trigger &other) const { return name_ == other.name_; } + // NOLINTNEXTLINE (modernize-use-nullptr) + bool operator<(const Trigger &other) const { return name_ < other.name_; } + bool operator==(const std::string &other) const { return name_ == other; } + // NOLINTNEXTLINE (modernize-use-nullptr) + bool operator<(const std::string &other) const { return name_ < other; } + + private: + std::string name_; + ParsedQuery parsed_statements_; +}; +} // namespace query