Initial trigger definition (#133)
* Pull out cypher query parsing logic * Define trigger structure * Run triggers before commit * Use skip list for saving triggers
This commit is contained in:
parent
2a0b0d969f
commit
7e44434cdf
@ -9,6 +9,7 @@ add_custom_target(generate_lcp_query DEPENDS ${generated_lcp_query_files})
|
||||
set(mg_query_sources
|
||||
${lcp_query_cpp_files}
|
||||
common.cpp
|
||||
cypher_query_interpreter.cpp
|
||||
dump.cpp
|
||||
frontend/ast/cypher_main_visitor.cpp
|
||||
frontend/ast/pretty_print.cpp
|
||||
@ -30,6 +31,7 @@ set(mg_query_sources
|
||||
procedure/mg_procedure_impl.cpp
|
||||
procedure/module.cpp
|
||||
procedure/py_module.cpp
|
||||
trigger.cpp
|
||||
typed_value.cpp)
|
||||
|
||||
add_library(mg-query STATIC ${mg_query_sources})
|
||||
|
135
src/query/cypher_query_interpreter.cpp
Normal file
135
src/query/cypher_query_interpreter.cpp
Normal file
@ -0,0 +1,135 @@
|
||||
#include "query/cypher_query_interpreter.hpp"
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner.");
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.",
|
||||
FLAG_IN_RANGE(0, std::numeric_limits<int32_t>::max()));
|
||||
|
||||
namespace query {
|
||||
CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {}
|
||||
|
||||
ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> ¶ms,
|
||||
utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock) {
|
||||
// Strip the query for caching purposes. The process of stripping a query
|
||||
// "normalizes" it by replacing any literals with new parameters. This
|
||||
// results in just the *structure* of the query being taken into account for
|
||||
// caching.
|
||||
frontend::StrippedQuery stripped_query{query_string};
|
||||
|
||||
// Copy over the parameters that were introduced during stripping.
|
||||
Parameters parameters{stripped_query.literals()};
|
||||
|
||||
// Check that all user-specified parameters are provided.
|
||||
for (const auto ¶m_pair : stripped_query.parameters()) {
|
||||
auto it = params.find(param_pair.second);
|
||||
|
||||
if (it == params.end()) {
|
||||
throw query::UnprovidedParameterError("Parameter ${} not provided.", param_pair.second);
|
||||
}
|
||||
|
||||
parameters.Add(param_pair.first, it->second);
|
||||
}
|
||||
|
||||
// Cache the query's AST if it isn't already.
|
||||
auto hash = stripped_query.hash();
|
||||
auto accessor = cache->access();
|
||||
auto it = accessor.find(hash);
|
||||
std::unique_ptr<frontend::opencypher::Parser> parser;
|
||||
|
||||
// Return a copy of both the AST storage and the query.
|
||||
CachedQuery result;
|
||||
bool is_cacheable = true;
|
||||
|
||||
auto get_information_from_cache = [&](const auto &cached_query) {
|
||||
result.ast_storage.properties_ = cached_query.ast_storage.properties_;
|
||||
result.ast_storage.labels_ = cached_query.ast_storage.labels_;
|
||||
result.ast_storage.edge_types_ = cached_query.ast_storage.edge_types_;
|
||||
|
||||
result.query = cached_query.query->Clone(&result.ast_storage);
|
||||
result.required_privileges = cached_query.required_privileges;
|
||||
};
|
||||
|
||||
if (it == accessor.end()) {
|
||||
{
|
||||
std::unique_lock<utils::SpinLock> guard(*antlr_lock);
|
||||
|
||||
try {
|
||||
parser = std::make_unique<frontend::opencypher::Parser>(stripped_query.query());
|
||||
} catch (const SyntaxException &e) {
|
||||
// There is a syntax exception in the stripped query. Re-run the parser
|
||||
// on the original query to get an appropriate error messsage.
|
||||
parser = std::make_unique<frontend::opencypher::Parser>(query_string);
|
||||
|
||||
// If an exception was not thrown here, the stripper messed something
|
||||
// up.
|
||||
LOG_FATAL("The stripped query can't be parsed, but the original can.");
|
||||
}
|
||||
}
|
||||
|
||||
// Convert the ANTLR4 parse tree into an AST.
|
||||
AstStorage ast_storage;
|
||||
frontend::ParsingContext context{true};
|
||||
frontend::CypherMainVisitor visitor(context, &ast_storage);
|
||||
|
||||
visitor.visit(parser->tree());
|
||||
|
||||
if (visitor.IsCacheable()) {
|
||||
CachedQuery cached_query{std::move(ast_storage), visitor.query(), query::GetRequiredPrivileges(visitor.query())};
|
||||
it = accessor.insert({hash, std::move(cached_query)}).first;
|
||||
|
||||
get_information_from_cache(it->second);
|
||||
} else {
|
||||
result.ast_storage.properties_ = ast_storage.properties_;
|
||||
result.ast_storage.labels_ = ast_storage.labels_;
|
||||
result.ast_storage.edge_types_ = ast_storage.edge_types_;
|
||||
|
||||
result.query = visitor.query()->Clone(&result.ast_storage);
|
||||
result.required_privileges = query::GetRequiredPrivileges(visitor.query());
|
||||
|
||||
is_cacheable = false;
|
||||
}
|
||||
} else {
|
||||
get_information_from_cache(it->second);
|
||||
}
|
||||
|
||||
return ParsedQuery{query_string,
|
||||
params,
|
||||
std::move(parameters),
|
||||
std::move(stripped_query),
|
||||
std::move(result.ast_storage),
|
||||
result.query,
|
||||
std::move(result.required_privileges),
|
||||
is_cacheable};
|
||||
}
|
||||
|
||||
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters,
|
||||
DbAccessor *db_accessor) {
|
||||
auto vertex_counts = plan::MakeVertexCountCache(db_accessor);
|
||||
auto symbol_table = MakeSymbolTable(query);
|
||||
auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts);
|
||||
auto [root, cost] = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_cost_planner);
|
||||
return std::make_unique<SingleNodeLogicalPlan>(std::move(root), cost, std::move(ast_storage),
|
||||
std::move(symbol_table));
|
||||
}
|
||||
|
||||
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
|
||||
const Parameters ¶meters, utils::SkipList<PlanCacheEntry> *plan_cache,
|
||||
DbAccessor *db_accessor, const bool is_cacheable) {
|
||||
auto plan_cache_access = plan_cache->access();
|
||||
auto it = plan_cache_access.find(hash);
|
||||
if (it != plan_cache_access.end()) {
|
||||
if (it->second->IsExpired()) {
|
||||
plan_cache_access.remove(hash);
|
||||
} else {
|
||||
return it->second;
|
||||
}
|
||||
}
|
||||
|
||||
auto plan = std::make_shared<CachedPlan>(MakeLogicalPlan(std::move(ast_storage), (query), parameters, db_accessor));
|
||||
if (is_cacheable) {
|
||||
plan_cache_access.insert({hash, plan});
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
} // namespace query
|
149
src/query/cypher_query_interpreter.hpp
Normal file
149
src/query/cypher_query_interpreter.hpp
Normal file
@ -0,0 +1,149 @@
|
||||
#pragma once
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
// THIS INCLUDE SHOULD ALWAYS COME BEFORE THE
|
||||
// "cypher_main_visitor.hpp"
|
||||
// "planner.hpp" includes json.hpp which uses libc's
|
||||
// EOF macro while "cypher_main_visitor.hpp" includes
|
||||
// "antlr4-runtime.h" which contains a static variable
|
||||
// of the same name, EOF.
|
||||
// This hides the definition of the macro which causes
|
||||
// the compilation to fail.
|
||||
#include "query/plan/planner.hpp"
|
||||
//////////////////////////////////////////////////////
|
||||
#include "query/frontend/ast/cypher_main_visitor.hpp"
|
||||
#include "query/frontend/opencypher/parser.hpp"
|
||||
#include "query/frontend/semantic/required_privileges.hpp"
|
||||
#include "query/frontend/semantic/symbol_generator.hpp"
|
||||
#include "query/frontend/stripped.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_bool(query_cost_planner);
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DECLARE_int32(query_plan_cache_ttl);
|
||||
|
||||
namespace query {
|
||||
|
||||
// TODO: Maybe this should move to query/plan/planner.
|
||||
/// Interface for accessing the root operator of a logical plan.
|
||||
class LogicalPlan {
|
||||
public:
|
||||
explicit LogicalPlan() = default;
|
||||
|
||||
virtual ~LogicalPlan() = default;
|
||||
|
||||
LogicalPlan(const LogicalPlan &) = default;
|
||||
LogicalPlan &operator=(const LogicalPlan &) = default;
|
||||
LogicalPlan(LogicalPlan &&) = default;
|
||||
LogicalPlan &operator=(LogicalPlan &&) = default;
|
||||
|
||||
virtual const plan::LogicalOperator &GetRoot() const = 0;
|
||||
virtual double GetCost() const = 0;
|
||||
virtual const SymbolTable &GetSymbolTable() const = 0;
|
||||
virtual const AstStorage &GetAstStorage() const = 0;
|
||||
};
|
||||
|
||||
class CachedPlan {
|
||||
public:
|
||||
explicit CachedPlan(std::unique_ptr<LogicalPlan> plan);
|
||||
|
||||
const auto &plan() const { return plan_->GetRoot(); }
|
||||
double cost() const { return plan_->GetCost(); }
|
||||
const auto &symbol_table() const { return plan_->GetSymbolTable(); }
|
||||
const auto &ast_storage() const { return plan_->GetAstStorage(); }
|
||||
|
||||
bool IsExpired() const {
|
||||
// NOLINTNEXTLINE (modernize-use-nullptr)
|
||||
return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl);
|
||||
};
|
||||
|
||||
private:
|
||||
std::unique_ptr<LogicalPlan> plan_;
|
||||
utils::Timer cache_timer_;
|
||||
};
|
||||
|
||||
struct CachedQuery {
|
||||
AstStorage ast_storage;
|
||||
Query *query;
|
||||
std::vector<AuthQuery::Privilege> required_privileges;
|
||||
};
|
||||
|
||||
struct QueryCacheEntry {
|
||||
bool operator==(const QueryCacheEntry &other) const { return first == other.first; }
|
||||
bool operator<(const QueryCacheEntry &other) const { return first < other.first; }
|
||||
bool operator==(const uint64_t &other) const { return first == other; }
|
||||
bool operator<(const uint64_t &other) const { return first < other; }
|
||||
|
||||
uint64_t first;
|
||||
// TODO: Maybe store the query string here and use it as a key with the hash
|
||||
// so that we eliminate the risk of hash collisions.
|
||||
CachedQuery second;
|
||||
};
|
||||
|
||||
struct PlanCacheEntry {
|
||||
bool operator==(const PlanCacheEntry &other) const { return first == other.first; }
|
||||
bool operator<(const PlanCacheEntry &other) const { return first < other.first; }
|
||||
bool operator==(const uint64_t &other) const { return first == other; }
|
||||
bool operator<(const uint64_t &other) const { return first < other; }
|
||||
|
||||
uint64_t first;
|
||||
// TODO: Maybe store the query string here and use it as a key with the hash
|
||||
// so that we eliminate the risk of hash collisions.
|
||||
std::shared_ptr<CachedPlan> second;
|
||||
};
|
||||
|
||||
/**
|
||||
* A container for data related to the parsing of a query.
|
||||
*/
|
||||
struct ParsedQuery {
|
||||
std::string query_string;
|
||||
std::map<std::string, storage::PropertyValue> user_parameters;
|
||||
Parameters parameters;
|
||||
frontend::StrippedQuery stripped_query;
|
||||
AstStorage ast_storage;
|
||||
Query *query;
|
||||
std::vector<AuthQuery::Privilege> required_privileges;
|
||||
bool is_cacheable{true};
|
||||
};
|
||||
|
||||
ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> ¶ms,
|
||||
utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock);
|
||||
|
||||
class SingleNodeLogicalPlan final : public LogicalPlan {
|
||||
public:
|
||||
SingleNodeLogicalPlan(std::unique_ptr<plan::LogicalOperator> root, double cost, AstStorage storage,
|
||||
const SymbolTable &symbol_table)
|
||||
: root_(std::move(root)), cost_(cost), storage_(std::move(storage)), symbol_table_(symbol_table) {}
|
||||
|
||||
const plan::LogicalOperator &GetRoot() const override { return *root_; }
|
||||
double GetCost() const override { return cost_; }
|
||||
const SymbolTable &GetSymbolTable() const override { return symbol_table_; }
|
||||
const AstStorage &GetAstStorage() const override { return storage_; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<plan::LogicalOperator> root_;
|
||||
double cost_;
|
||||
AstStorage storage_;
|
||||
SymbolTable symbol_table_;
|
||||
};
|
||||
|
||||
/**
|
||||
* Convert a parsed *Cypher* query's AST into a logical plan.
|
||||
*
|
||||
* The created logical plan will take ownership of the `AstStorage` within
|
||||
* `ParsedQuery` and might modify it during planning.
|
||||
*/
|
||||
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters,
|
||||
DbAccessor *db_accessor);
|
||||
|
||||
/**
|
||||
* Return the parsed *Cypher* query's AST cached logical plan, or create and
|
||||
* cache a fresh one if it doesn't yet exist.
|
||||
*/
|
||||
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
|
||||
const Parameters ¶meters, utils::SkipList<PlanCacheEntry> *plan_cache,
|
||||
DbAccessor *db_accessor, bool is_cacheable = true);
|
||||
|
||||
} // namespace query
|
@ -31,10 +31,6 @@
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/tsc.hpp"
|
||||
|
||||
DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner.");
|
||||
DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.",
|
||||
FLAG_IN_RANGE(0, std::numeric_limits<int32_t>::max()));
|
||||
|
||||
namespace EventCounter {
|
||||
extern Event ReadQuery;
|
||||
extern Event WriteQuery;
|
||||
@ -62,135 +58,6 @@ void UpdateTypeCount(const plan::ReadWriteTypeChecker::RWType type) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
/**
|
||||
* A container for data related to the parsing of a query.
|
||||
*/
|
||||
struct ParsedQuery {
|
||||
std::string query_string;
|
||||
std::map<std::string, storage::PropertyValue> user_parameters;
|
||||
Parameters parameters;
|
||||
frontend::StrippedQuery stripped_query;
|
||||
AstStorage ast_storage;
|
||||
Query *query;
|
||||
std::vector<AuthQuery::Privilege> required_privileges;
|
||||
bool is_cacheable{true};
|
||||
};
|
||||
|
||||
ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> ¶ms,
|
||||
utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock) {
|
||||
// Strip the query for caching purposes. The process of stripping a query
|
||||
// "normalizes" it by replacing any literals with new parameters. This
|
||||
// results in just the *structure* of the query being taken into account for
|
||||
// caching.
|
||||
frontend::StrippedQuery stripped_query{query_string};
|
||||
|
||||
// Copy over the parameters that were introduced during stripping.
|
||||
Parameters parameters{stripped_query.literals()};
|
||||
|
||||
// Check that all user-specified parameters are provided.
|
||||
for (const auto ¶m_pair : stripped_query.parameters()) {
|
||||
auto it = params.find(param_pair.second);
|
||||
|
||||
if (it == params.end()) {
|
||||
throw query::UnprovidedParameterError("Parameter ${} not provided.", param_pair.second);
|
||||
}
|
||||
|
||||
parameters.Add(param_pair.first, it->second);
|
||||
}
|
||||
|
||||
// Cache the query's AST if it isn't already.
|
||||
auto hash = stripped_query.hash();
|
||||
auto accessor = cache->access();
|
||||
auto it = accessor.find(hash);
|
||||
std::unique_ptr<frontend::opencypher::Parser> parser;
|
||||
|
||||
// Return a copy of both the AST storage and the query.
|
||||
CachedQuery result;
|
||||
bool is_cacheable = true;
|
||||
|
||||
auto get_information_from_cache = [&](const auto &cached_query) {
|
||||
result.ast_storage.properties_ = cached_query.ast_storage.properties_;
|
||||
result.ast_storage.labels_ = cached_query.ast_storage.labels_;
|
||||
result.ast_storage.edge_types_ = cached_query.ast_storage.edge_types_;
|
||||
|
||||
result.query = cached_query.query->Clone(&result.ast_storage);
|
||||
result.required_privileges = cached_query.required_privileges;
|
||||
};
|
||||
|
||||
if (it == accessor.end()) {
|
||||
{
|
||||
std::unique_lock<utils::SpinLock> guard(*antlr_lock);
|
||||
|
||||
try {
|
||||
parser = std::make_unique<frontend::opencypher::Parser>(stripped_query.query());
|
||||
} catch (const SyntaxException &e) {
|
||||
// There is a syntax exception in the stripped query. Re-run the parser
|
||||
// on the original query to get an appropriate error messsage.
|
||||
parser = std::make_unique<frontend::opencypher::Parser>(query_string);
|
||||
|
||||
// If an exception was not thrown here, the stripper messed something
|
||||
// up.
|
||||
LOG_FATAL("The stripped query can't be parsed, but the original can.");
|
||||
}
|
||||
}
|
||||
|
||||
// Convert the ANTLR4 parse tree into an AST.
|
||||
AstStorage ast_storage;
|
||||
frontend::ParsingContext context{true};
|
||||
frontend::CypherMainVisitor visitor(context, &ast_storage);
|
||||
|
||||
visitor.visit(parser->tree());
|
||||
|
||||
if (visitor.IsCacheable()) {
|
||||
CachedQuery cached_query{std::move(ast_storage), visitor.query(), query::GetRequiredPrivileges(visitor.query())};
|
||||
it = accessor.insert({hash, std::move(cached_query)}).first;
|
||||
|
||||
get_information_from_cache(it->second);
|
||||
} else {
|
||||
result.ast_storage.properties_ = ast_storage.properties_;
|
||||
result.ast_storage.labels_ = ast_storage.labels_;
|
||||
result.ast_storage.edge_types_ = ast_storage.edge_types_;
|
||||
|
||||
result.query = visitor.query()->Clone(&result.ast_storage);
|
||||
result.required_privileges = query::GetRequiredPrivileges(visitor.query());
|
||||
|
||||
is_cacheable = false;
|
||||
}
|
||||
} else {
|
||||
get_information_from_cache(it->second);
|
||||
}
|
||||
|
||||
return ParsedQuery{query_string,
|
||||
params,
|
||||
std::move(parameters),
|
||||
std::move(stripped_query),
|
||||
std::move(result.ast_storage),
|
||||
result.query,
|
||||
std::move(result.required_privileges),
|
||||
is_cacheable};
|
||||
}
|
||||
|
||||
class SingleNodeLogicalPlan final : public LogicalPlan {
|
||||
public:
|
||||
SingleNodeLogicalPlan(std::unique_ptr<plan::LogicalOperator> root, double cost, AstStorage storage,
|
||||
const SymbolTable &symbol_table)
|
||||
: root_(std::move(root)), cost_(cost), storage_(std::move(storage)), symbol_table_(symbol_table) {}
|
||||
|
||||
const plan::LogicalOperator &GetRoot() const override { return *root_; }
|
||||
double GetCost() const override { return cost_; }
|
||||
const SymbolTable &GetSymbolTable() const override { return symbol_table_; }
|
||||
const AstStorage &GetAstStorage() const override { return storage_; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<plan::LogicalOperator> root_;
|
||||
double cost_;
|
||||
AstStorage storage_;
|
||||
SymbolTable symbol_table_;
|
||||
};
|
||||
|
||||
CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {}
|
||||
|
||||
struct Callback {
|
||||
std::vector<std::string> header;
|
||||
@ -575,11 +442,6 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler *
|
||||
}
|
||||
}
|
||||
|
||||
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
|
||||
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
|
||||
}
|
||||
|
||||
namespace {
|
||||
// Struct for lazy pulling from a vector
|
||||
struct PullPlanVector {
|
||||
explicit PullPlanVector(std::vector<std::vector<TypedValue>> values) : values_(std::move(values)) {}
|
||||
@ -730,52 +592,13 @@ std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<
|
||||
return ctx_;
|
||||
}
|
||||
|
||||
using RWType = plan::ReadWriteTypeChecker::RWType;
|
||||
} // namespace
|
||||
|
||||
/**
|
||||
* Convert a parsed *Cypher* query's AST into a logical plan.
|
||||
*
|
||||
* The created logical plan will take ownership of the `AstStorage` within
|
||||
* `ParsedQuery` and might modify it during planning.
|
||||
*/
|
||||
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters,
|
||||
DbAccessor *db_accessor) {
|
||||
auto vertex_counts = plan::MakeVertexCountCache(db_accessor);
|
||||
auto symbol_table = MakeSymbolTable(query);
|
||||
auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts);
|
||||
std::unique_ptr<plan::LogicalOperator> root;
|
||||
double cost;
|
||||
std::tie(root, cost) = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_cost_planner);
|
||||
return std::make_unique<SingleNodeLogicalPlan>(std::move(root), cost, std::move(ast_storage),
|
||||
std::move(symbol_table));
|
||||
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
|
||||
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the parsed *Cypher* query's AST cached logical plan, or create and
|
||||
* cache a fresh one if it doesn't yet exist.
|
||||
*/
|
||||
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
|
||||
const Parameters ¶meters, utils::SkipList<PlanCacheEntry> *plan_cache,
|
||||
DbAccessor *db_accessor, const bool is_cacheable = true) {
|
||||
auto plan_cache_access = plan_cache->access();
|
||||
auto it = plan_cache_access.find(hash);
|
||||
if (it != plan_cache_access.end()) {
|
||||
if (it->second->IsExpired()) {
|
||||
plan_cache_access.remove(hash);
|
||||
} else {
|
||||
return it->second;
|
||||
}
|
||||
}
|
||||
|
||||
auto plan = std::make_shared<CachedPlan>(MakeLogicalPlan(std::move(ast_storage), (query), parameters, db_accessor));
|
||||
if (is_cacheable) {
|
||||
plan_cache_access.insert({hash, plan});
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
using RWType = plan::ReadWriteTypeChecker::RWType;
|
||||
|
||||
PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) {
|
||||
std::function<void()> handler;
|
||||
|
||||
@ -1585,6 +1408,7 @@ void Interpreter::Commit() {
|
||||
// We should document clearly that all results should be pulled to complete
|
||||
// a query.
|
||||
if (!db_accessor_) return;
|
||||
|
||||
auto maybe_constraint_violation = db_accessor_->Commit();
|
||||
if (maybe_constraint_violation.HasError()) {
|
||||
const auto &constraint_violation = maybe_constraint_violation.GetError();
|
||||
@ -1613,6 +1437,15 @@ void Interpreter::Commit() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run the triggers
|
||||
for (const auto &trigger : interpreter_context_->triggers.access()) {
|
||||
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||
trigger.Execute(&interpreter_context_->plan_cache, &*execution_db_accessor_, &execution_memory,
|
||||
*interpreter_context_->tsc_frequency, interpreter_context_->execution_timeout_sec,
|
||||
&interpreter_context_->is_shutting_down);
|
||||
}
|
||||
|
||||
execution_db_accessor_ = std::nullopt;
|
||||
db_accessor_ = std::nullopt;
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "query/context.hpp"
|
||||
#include "query/cypher_query_interpreter.hpp"
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
@ -12,6 +13,7 @@
|
||||
#include "query/plan/operator.hpp"
|
||||
#include "query/plan/read_write_type_checker.hpp"
|
||||
#include "query/stream.hpp"
|
||||
#include "query/trigger.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "utils/event_counter.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
@ -21,9 +23,6 @@
|
||||
#include "utils/timer.hpp"
|
||||
#include "utils/tsc.hpp"
|
||||
|
||||
DECLARE_bool(query_cost_planner);
|
||||
DECLARE_int32(query_plan_cache_ttl);
|
||||
|
||||
namespace EventCounter {
|
||||
extern const Event FailedQuery;
|
||||
} // namespace EventCounter
|
||||
@ -139,64 +138,6 @@ struct PreparedQuery {
|
||||
plan::ReadWriteTypeChecker::RWType rw_type;
|
||||
};
|
||||
|
||||
// TODO: Maybe this should move to query/plan/planner.
|
||||
/// Interface for accessing the root operator of a logical plan.
|
||||
class LogicalPlan {
|
||||
public:
|
||||
virtual ~LogicalPlan() {}
|
||||
|
||||
virtual const plan::LogicalOperator &GetRoot() const = 0;
|
||||
virtual double GetCost() const = 0;
|
||||
virtual const SymbolTable &GetSymbolTable() const = 0;
|
||||
virtual const AstStorage &GetAstStorage() const = 0;
|
||||
};
|
||||
|
||||
class CachedPlan {
|
||||
public:
|
||||
explicit CachedPlan(std::unique_ptr<LogicalPlan> plan);
|
||||
|
||||
const auto &plan() const { return plan_->GetRoot(); }
|
||||
double cost() const { return plan_->GetCost(); }
|
||||
const auto &symbol_table() const { return plan_->GetSymbolTable(); }
|
||||
const auto &ast_storage() const { return plan_->GetAstStorage(); }
|
||||
|
||||
bool IsExpired() const { return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl); };
|
||||
|
||||
private:
|
||||
std::unique_ptr<LogicalPlan> plan_;
|
||||
utils::Timer cache_timer_;
|
||||
};
|
||||
|
||||
struct CachedQuery {
|
||||
AstStorage ast_storage;
|
||||
Query *query;
|
||||
std::vector<AuthQuery::Privilege> required_privileges;
|
||||
};
|
||||
|
||||
struct QueryCacheEntry {
|
||||
bool operator==(const QueryCacheEntry &other) const { return first == other.first; }
|
||||
bool operator<(const QueryCacheEntry &other) const { return first < other.first; }
|
||||
bool operator==(const uint64_t &other) const { return first == other; }
|
||||
bool operator<(const uint64_t &other) const { return first < other; }
|
||||
|
||||
uint64_t first;
|
||||
// TODO: Maybe store the query string here and use it as a key with the hash
|
||||
// so that we eliminate the risk of hash collisions.
|
||||
CachedQuery second;
|
||||
};
|
||||
|
||||
struct PlanCacheEntry {
|
||||
bool operator==(const PlanCacheEntry &other) const { return first == other.first; }
|
||||
bool operator<(const PlanCacheEntry &other) const { return first < other.first; }
|
||||
bool operator==(const uint64_t &other) const { return first == other; }
|
||||
bool operator<(const uint64_t &other) const { return first < other; }
|
||||
|
||||
uint64_t first;
|
||||
// TODO: Maybe store the query string here and use it as a key with the hash
|
||||
// so that we eliminate the risk of hash collisions.
|
||||
std::shared_ptr<CachedPlan> second;
|
||||
};
|
||||
|
||||
/**
|
||||
* Holds data shared between multiple `Interpreter` instances (which might be
|
||||
* running concurrently).
|
||||
@ -205,7 +146,10 @@ struct PlanCacheEntry {
|
||||
* been passed to an `Interpreter` instance.
|
||||
*/
|
||||
struct InterpreterContext {
|
||||
explicit InterpreterContext(storage::Storage *db) : db(db) {}
|
||||
explicit InterpreterContext(storage::Storage *db) : db(db) {
|
||||
// auto triggers_acc = triggers.access();
|
||||
// triggers_acc.insert(Trigger{"Creator", "CREATE (:CREATED)", &ast_cache, &antlr_lock});
|
||||
}
|
||||
|
||||
storage::Storage *db;
|
||||
|
||||
@ -225,6 +169,9 @@ struct InterpreterContext {
|
||||
|
||||
utils::SkipList<QueryCacheEntry> ast_cache;
|
||||
utils::SkipList<PlanCacheEntry> plan_cache;
|
||||
|
||||
// use a thread safe container
|
||||
utils::SkipList<Trigger> triggers;
|
||||
};
|
||||
|
||||
/// Function that is used to tell all active interpreters that they should stop
|
||||
|
63
src/query/trigger.cpp
Normal file
63
src/query/trigger.cpp
Normal file
@ -0,0 +1,63 @@
|
||||
#include "query/trigger.hpp"
|
||||
#include "query/context.hpp"
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
#include "query/interpret/frame.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
|
||||
namespace query {
|
||||
Trigger::Trigger(std::string name, std::string query, utils::SkipList<QueryCacheEntry> *cache,
|
||||
utils::SpinLock *antlr_lock)
|
||||
: name_(std::move(name)),
|
||||
parsed_statements_{ParseQuery(query, {} /* this should contain the predefined parameters */, cache, antlr_lock)} {
|
||||
}
|
||||
|
||||
void Trigger::Execute(utils::SkipList<PlanCacheEntry> *plan_cache, DbAccessor *dba,
|
||||
utils::MonotonicBufferResource *execution_memory, const double tsc_frequency,
|
||||
const double max_execution_time_sec, std::atomic<bool> *is_shutting_down) const {
|
||||
AstStorage ast_storage;
|
||||
ast_storage.properties_ = parsed_statements_.ast_storage.properties_;
|
||||
ast_storage.labels_ = parsed_statements_.ast_storage.labels_;
|
||||
ast_storage.edge_types_ = parsed_statements_.ast_storage.edge_types_;
|
||||
|
||||
auto plan = CypherQueryToPlan(parsed_statements_.stripped_query.hash(), std::move(ast_storage),
|
||||
utils::Downcast<CypherQuery>(parsed_statements_.query), parsed_statements_.parameters,
|
||||
plan_cache, dba, parsed_statements_.is_cacheable);
|
||||
ExecutionContext ctx;
|
||||
ctx.db_accessor = dba;
|
||||
ctx.symbol_table = plan->symbol_table();
|
||||
ctx.evaluation_context.timestamp =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
ctx.evaluation_context.parameters = parsed_statements_.parameters;
|
||||
ctx.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba);
|
||||
ctx.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba);
|
||||
ctx.execution_tsc_timer = utils::TSCTimer(tsc_frequency);
|
||||
ctx.max_execution_time_sec = max_execution_time_sec;
|
||||
ctx.is_shutting_down = is_shutting_down;
|
||||
ctx.is_profile_query = false;
|
||||
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
constexpr size_t stack_size = 256 * 1024;
|
||||
char stack_data[stack_size];
|
||||
|
||||
// We can throw on every query because a simple queries for deleting will use only
|
||||
// the stack allocated buffer.
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
utils::ResourceWithOutOfMemoryException resource_with_exception;
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
ctx.evaluation_context.memory = &pool_memory;
|
||||
|
||||
auto cursor = plan->plan().MakeCursor(execution_memory);
|
||||
Frame frame{plan->symbol_table().max_position(), execution_memory};
|
||||
while (cursor->Pull(frame, ctx))
|
||||
;
|
||||
|
||||
cursor->Shutdown();
|
||||
}
|
||||
} // namespace query
|
26
src/query/trigger.hpp
Normal file
26
src/query/trigger.hpp
Normal file
@ -0,0 +1,26 @@
|
||||
#pragma once
|
||||
|
||||
#include "query/cypher_query_interpreter.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
|
||||
namespace query {
|
||||
struct Trigger {
|
||||
explicit Trigger(std::string name, std::string query, utils::SkipList<QueryCacheEntry> *cache,
|
||||
utils::SpinLock *antlr_lock);
|
||||
|
||||
void Execute(utils::SkipList<PlanCacheEntry> *plan_cache, DbAccessor *dba,
|
||||
utils::MonotonicBufferResource *execution_memory, double tsc_frequency, double max_execution_time_sec,
|
||||
std::atomic<bool> *is_shutting_down) const;
|
||||
|
||||
bool operator==(const Trigger &other) const { return name_ == other.name_; }
|
||||
// NOLINTNEXTLINE (modernize-use-nullptr)
|
||||
bool operator<(const Trigger &other) const { return name_ < other.name_; }
|
||||
bool operator==(const std::string &other) const { return name_ == other; }
|
||||
// NOLINTNEXTLINE (modernize-use-nullptr)
|
||||
bool operator<(const std::string &other) const { return name_ < other; }
|
||||
|
||||
private:
|
||||
std::string name_;
|
||||
ParsedQuery parsed_statements_;
|
||||
};
|
||||
} // namespace query
|
Loading…
Reference in New Issue
Block a user