Remove Results
from the Interpreter
Summary: - Add the `AnyStream` wrapper - Remove the `Results` struct and store a function (handler) instead Reviewers: teon.banek, mferencevic Reviewed By: teon.banek, mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2497
This commit is contained in:
parent
283a91cc60
commit
bf03c57181
@ -187,17 +187,6 @@ class SingleNodeLogicalPlan final : public LogicalPlan {
|
|||||||
CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan)
|
CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan)
|
||||||
: plan_(std::move(plan)) {}
|
: plan_(std::move(plan)) {}
|
||||||
|
|
||||||
void Interpreter::PrettyPrintPlan(const DbAccessor &dba,
|
|
||||||
const plan::LogicalOperator *plan_root,
|
|
||||||
std::ostream *out) {
|
|
||||||
plan::PrettyPrint(dba, plan_root, out);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string Interpreter::PlanToJson(const DbAccessor &dba,
|
|
||||||
const plan::LogicalOperator *plan_root) {
|
|
||||||
return plan::PlanToJson(dba, plan_root).dump();
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Callback {
|
struct Callback {
|
||||||
std::vector<std::string> header;
|
std::vector<std::string> header;
|
||||||
std::function<std::vector<std::vector<TypedValue>>()> fn;
|
std::function<std::vector<std::vector<TypedValue>>()> fn;
|
||||||
@ -768,8 +757,7 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context)
|
|||||||
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
|
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
|
||||||
Interpreter::Interpret(const std::string &query,
|
Interpreter::Interpret(const std::string &query,
|
||||||
const std::map<std::string, PropertyValue> ¶ms) {
|
const std::map<std::string, PropertyValue> ¶ms) {
|
||||||
// Clear pending results.
|
prepared_query_ = std::nullopt;
|
||||||
results_ = std::nullopt;
|
|
||||||
execution_memory_.Release();
|
execution_memory_.Release();
|
||||||
|
|
||||||
// Check the query for transaction commands.
|
// Check the query for transaction commands.
|
||||||
@ -820,329 +808,596 @@ Interpreter::Interpret(const std::string &query,
|
|||||||
execution_db_accessor_.emplace(&*db_accessor_);
|
execution_db_accessor_.emplace(&*db_accessor_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear leftover results.
|
prepared_query_ = std::nullopt;
|
||||||
results_ = std::nullopt;
|
|
||||||
execution_memory_.Release();
|
execution_memory_.Release();
|
||||||
|
|
||||||
// Interpret the query and return the headers.
|
// Prepare the query and return the headers.
|
||||||
try {
|
try {
|
||||||
results_.emplace(Prepare(query, params, &*execution_db_accessor_));
|
Prepare(query, params);
|
||||||
return {results_->header(), results_->privileges()};
|
return {prepared_query_->header, prepared_query_->privileges};
|
||||||
} catch (const utils::BasicException &) {
|
} catch (const utils::BasicException &) {
|
||||||
AbortCommand();
|
AbortCommand();
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Interpreter::Results Interpreter::Prepare(
|
ExecutionContext PullAllPlan(AnyStream *stream, const CachedPlan &plan,
|
||||||
const std::string &query_string,
|
const Parameters ¶meters,
|
||||||
const std::map<std::string, PropertyValue> ¶ms,
|
const std::vector<Symbol> &output_symbols,
|
||||||
DbAccessor *db_accessor) {
|
bool is_profile_query,
|
||||||
std::map<std::string, TypedValue> summary;
|
std::map<std::string, TypedValue> *summary,
|
||||||
|
DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
auto cursor = plan.plan().MakeCursor(execution_memory);
|
||||||
|
Frame frame(plan.symbol_table().max_position(), execution_memory);
|
||||||
|
|
||||||
utils::Timer parsing_timer;
|
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||||
ParsedQuery parsed_query =
|
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||||
ParseQuery(query_string, params, &interpreter_context_->ast_cache,
|
// single `Pull`.
|
||||||
&interpreter_context_->antlr_lock);
|
constexpr size_t stack_size = 256 * 1024;
|
||||||
auto parsing_time = parsing_timer.Elapsed();
|
char stack_data[stack_size];
|
||||||
summary["parsing_time"] = parsing_time.count();
|
|
||||||
// TODO: set summary['type'] based on transaction metadata
|
|
||||||
// the type can't be determined based only on top level LogicalOp
|
|
||||||
// (for example MATCH DELETE RETURN will have Produce as it's top).
|
|
||||||
// For now always use "rw" because something must be set, but it doesn't
|
|
||||||
// have to be correct (for Bolt clients).
|
|
||||||
summary["type"] = "rw";
|
|
||||||
|
|
||||||
utils::Timer planning_timer;
|
ExecutionContext ctx;
|
||||||
|
ctx.db_accessor = dba;
|
||||||
|
ctx.symbol_table = plan.symbol_table();
|
||||||
|
ctx.evaluation_context.timestamp =
|
||||||
|
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||||
|
std::chrono::system_clock::now().time_since_epoch())
|
||||||
|
.count();
|
||||||
|
ctx.evaluation_context.parameters = parameters;
|
||||||
|
ctx.evaluation_context.properties =
|
||||||
|
NamesToProperties(plan.ast_storage().properties_, dba);
|
||||||
|
ctx.evaluation_context.labels =
|
||||||
|
NamesToLabels(plan.ast_storage().labels_, dba);
|
||||||
|
ctx.is_profile_query = is_profile_query;
|
||||||
|
|
||||||
// This local shared_ptr might be the only owner of the CachedPlan, so
|
utils::Timer timer;
|
||||||
// we must ensure it lives during the whole interpretation.
|
|
||||||
std::shared_ptr<CachedPlan> plan{nullptr};
|
|
||||||
|
|
||||||
#ifdef MG_SINGLE_NODE_HA
|
while (true) {
|
||||||
{
|
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size);
|
||||||
InfoQuery *info_query = nullptr;
|
// TODO (mferencevic): Tune the parameters accordingly.
|
||||||
if (!db_accessor->raft()->IsLeader() &&
|
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||||
(!(info_query = utils::Downcast<InfoQuery>(parsed_query.query)) ||
|
ctx.evaluation_context.memory = &pool_memory;
|
||||||
info_query->info_type_ != InfoQuery::InfoType::RAFT)) {
|
|
||||||
throw raft::CantExecuteQueries();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query)) {
|
if (!cursor->Pull(frame, ctx)) {
|
||||||
plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), cypher_query,
|
break;
|
||||||
std::move(parsed_query.ast_storage),
|
|
||||||
parsed_query.parameters, db_accessor);
|
|
||||||
auto planning_time = planning_timer.Elapsed();
|
|
||||||
summary["planning_time"] = planning_time.count();
|
|
||||||
summary["cost_estimate"] = plan->cost();
|
|
||||||
|
|
||||||
auto output_symbols = plan->plan().OutputSymbols(plan->symbol_table());
|
|
||||||
|
|
||||||
std::vector<std::string> header;
|
|
||||||
header.reserve(output_symbols.size());
|
|
||||||
|
|
||||||
for (const auto &symbol : output_symbols) {
|
|
||||||
// When the symbol is aliased or expanded from '*' (inside RETURN or
|
|
||||||
// WITH), then there is no token position, so use symbol name.
|
|
||||||
// Otherwise, find the name from stripped query.
|
|
||||||
header.push_back(
|
|
||||||
utils::FindOr(parsed_query.stripped_query.named_expressions(),
|
|
||||||
symbol.token_position(), symbol.name())
|
|
||||||
.first);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Results(db_accessor, parsed_query.parameters, plan,
|
if (!output_symbols.empty()) {
|
||||||
std::move(output_symbols), std::move(header),
|
// TODO: The streamed values should also probably use the above memory.
|
||||||
std::move(summary), parsed_query.required_privileges,
|
std::vector<TypedValue> values;
|
||||||
&execution_memory_);
|
values.reserve(output_symbols.size());
|
||||||
}
|
|
||||||
|
|
||||||
if (utils::IsSubtype(*parsed_query.query, ExplainQuery::kType)) {
|
for (const auto &symbol : output_symbols) {
|
||||||
const std::string kExplainQueryStart = "explain ";
|
values.emplace_back(frame[symbol]);
|
||||||
CHECK(utils::StartsWith(
|
|
||||||
utils::ToLowerCase(parsed_query.stripped_query.query()),
|
|
||||||
kExplainQueryStart))
|
|
||||||
<< "Expected stripped query to start with '" << kExplainQueryStart
|
|
||||||
<< "'";
|
|
||||||
|
|
||||||
// We want to cache the Cypher query that appears within this "metaquery".
|
|
||||||
// However, we can't just use the hash of that Cypher query string (as the
|
|
||||||
// cache key) but then continue to use the AST that was constructed with the
|
|
||||||
// full string. The parameters within the AST are looked up using their
|
|
||||||
// token positions, which depend on the query string as they're computed at
|
|
||||||
// the time the query string is parsed. So, for example, if one first runs
|
|
||||||
// EXPLAIN (or PROFILE) on a Cypher query and *then* runs the same Cypher
|
|
||||||
// query standalone, the second execution will crash because the cached AST
|
|
||||||
// (constructed using the first query string but cached using the substring
|
|
||||||
// (equivalent to the second query string)) will use the old token
|
|
||||||
// positions. For that reason, we fully strip and parse the substring as
|
|
||||||
// well.
|
|
||||||
//
|
|
||||||
// Note that the stripped subquery string's hash will be equivalent to the
|
|
||||||
// hash of the stripped query as if it was run standalone. This guarantees
|
|
||||||
// that we will reuse any cached plans from before, rather than create a new
|
|
||||||
// one every time. This is important because the planner takes the values of
|
|
||||||
// the query parameters into account when planning and might produce a
|
|
||||||
// totally different plan if we were to create a new one right now. Doing so
|
|
||||||
// would result in discrepancies between the explained (or profiled) plan
|
|
||||||
// and the one that's executed when the query is ran standalone.
|
|
||||||
ParsedQuery parsed_query = ParseQuery(
|
|
||||||
query_string.substr(kExplainQueryStart.size()), params,
|
|
||||||
&interpreter_context_->ast_cache, &interpreter_context_->antlr_lock);
|
|
||||||
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
|
|
||||||
CHECK(cypher_query)
|
|
||||||
<< "Cypher grammar should not allow other queries in EXPLAIN";
|
|
||||||
std::shared_ptr<CachedPlan> cypher_query_plan =
|
|
||||||
CypherQueryToPlan(parsed_query.stripped_query.hash(), cypher_query,
|
|
||||||
std::move(parsed_query.ast_storage),
|
|
||||||
parsed_query.parameters, db_accessor);
|
|
||||||
|
|
||||||
std::stringstream printed_plan;
|
|
||||||
PrettyPrintPlan(*db_accessor, &cypher_query_plan->plan(), &printed_plan);
|
|
||||||
|
|
||||||
std::vector<std::vector<TypedValue>> printed_plan_rows;
|
|
||||||
for (const auto &row :
|
|
||||||
utils::Split(utils::RTrim(printed_plan.str()), "\n")) {
|
|
||||||
printed_plan_rows.push_back(std::vector<TypedValue>{TypedValue(row)});
|
|
||||||
}
|
|
||||||
|
|
||||||
summary["explain"] = PlanToJson(*db_accessor, &cypher_query_plan->plan());
|
|
||||||
|
|
||||||
SymbolTable symbol_table;
|
|
||||||
auto query_plan_symbol = symbol_table.CreateSymbol("QUERY PLAN", false);
|
|
||||||
std::vector<Symbol> output_symbols{query_plan_symbol};
|
|
||||||
|
|
||||||
auto output_plan =
|
|
||||||
std::make_unique<plan::OutputTable>(output_symbols, printed_plan_rows);
|
|
||||||
|
|
||||||
plan = std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
|
||||||
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
|
|
||||||
|
|
||||||
auto planning_time = planning_timer.Elapsed();
|
|
||||||
summary["planning_time"] = planning_time.count();
|
|
||||||
|
|
||||||
std::vector<std::string> header{query_plan_symbol.name()};
|
|
||||||
|
|
||||||
return Results(db_accessor, parsed_query.parameters, plan,
|
|
||||||
std::move(output_symbols), std::move(header),
|
|
||||||
std::move(summary), parsed_query.required_privileges,
|
|
||||||
&execution_memory_);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (utils::IsSubtype(*parsed_query.query, ProfileQuery::kType)) {
|
|
||||||
const std::string kProfileQueryStart = "profile ";
|
|
||||||
CHECK(utils::StartsWith(
|
|
||||||
utils::ToLowerCase(parsed_query.stripped_query.query()),
|
|
||||||
kProfileQueryStart))
|
|
||||||
<< "Expected stripped query to start with '" << kProfileQueryStart
|
|
||||||
<< "'";
|
|
||||||
|
|
||||||
if (in_explicit_transaction_) {
|
|
||||||
throw ProfileInMulticommandTxException();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!interpreter_context_->is_tsc_available) {
|
|
||||||
throw QueryException("TSC support is missing for PROFILE");
|
|
||||||
}
|
|
||||||
|
|
||||||
// See the comment regarding the caching of Cypher queries within
|
|
||||||
// "metaqueries" for explain queries
|
|
||||||
ParsedQuery parsed_query = ParseQuery(
|
|
||||||
query_string.substr(kProfileQueryStart.size()), params,
|
|
||||||
&interpreter_context_->ast_cache, &interpreter_context_->antlr_lock);
|
|
||||||
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
|
|
||||||
CHECK(cypher_query)
|
|
||||||
<< "Cypher grammar should not allow other queries in PROFILE";
|
|
||||||
auto cypher_query_plan =
|
|
||||||
CypherQueryToPlan(parsed_query.stripped_query.hash(), cypher_query,
|
|
||||||
std::move(parsed_query.ast_storage),
|
|
||||||
parsed_query.parameters, db_accessor);
|
|
||||||
|
|
||||||
// Copy the symbol table and add our own symbols (used by the `OutputTable`
|
|
||||||
// operator below)
|
|
||||||
SymbolTable symbol_table(cypher_query_plan->symbol_table());
|
|
||||||
|
|
||||||
auto operator_symbol = symbol_table.CreateSymbol("OPERATOR", false);
|
|
||||||
auto actual_hits_symbol = symbol_table.CreateSymbol("ACTUAL HITS", false);
|
|
||||||
auto relative_time_symbol =
|
|
||||||
symbol_table.CreateSymbol("RELATIVE TIME", false);
|
|
||||||
auto absolute_time_symbol =
|
|
||||||
symbol_table.CreateSymbol("ABSOLUTE TIME", false);
|
|
||||||
|
|
||||||
std::vector<Symbol> output_symbols = {operator_symbol, actual_hits_symbol,
|
|
||||||
relative_time_symbol,
|
|
||||||
absolute_time_symbol};
|
|
||||||
std::vector<std::string> header{
|
|
||||||
operator_symbol.name(), actual_hits_symbol.name(),
|
|
||||||
relative_time_symbol.name(), absolute_time_symbol.name()};
|
|
||||||
|
|
||||||
auto output_plan = std::make_unique<plan::OutputTable>(
|
|
||||||
output_symbols,
|
|
||||||
[cypher_query_plan](Frame *frame, ExecutionContext *context) {
|
|
||||||
utils::MonotonicBufferResource execution_memory(1 * 1024 * 1024);
|
|
||||||
auto cursor = cypher_query_plan->plan().MakeCursor(&execution_memory);
|
|
||||||
|
|
||||||
// We are pulling from another plan, so set up the EvaluationContext
|
|
||||||
// correctly. The rest of the context should be good for sharing.
|
|
||||||
context->evaluation_context.properties =
|
|
||||||
NamesToProperties(cypher_query_plan->ast_storage().properties_,
|
|
||||||
context->db_accessor);
|
|
||||||
context->evaluation_context.labels = NamesToLabels(
|
|
||||||
cypher_query_plan->ast_storage().labels_, context->db_accessor);
|
|
||||||
|
|
||||||
// Pull everything to profile the execution
|
|
||||||
utils::Timer timer;
|
|
||||||
while (cursor->Pull(*frame, *context)) continue;
|
|
||||||
auto execution_time = timer.Elapsed();
|
|
||||||
|
|
||||||
context->profile_execution_time = execution_time;
|
|
||||||
|
|
||||||
return ProfilingStatsToTable(context->stats, execution_time);
|
|
||||||
});
|
|
||||||
|
|
||||||
plan = std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
|
||||||
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
|
|
||||||
|
|
||||||
auto planning_time = planning_timer.Elapsed();
|
|
||||||
summary["planning_time"] = planning_time.count();
|
|
||||||
|
|
||||||
return Results(db_accessor, parsed_query.parameters, plan,
|
|
||||||
std::move(output_symbols), std::move(header),
|
|
||||||
std::move(summary), parsed_query.required_privileges,
|
|
||||||
&execution_memory_,
|
|
||||||
/* is_profile_query */ true, /* should_abort_query */ true);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto *dump_query = utils::Downcast<DumpQuery>(parsed_query.query)) {
|
|
||||||
#ifndef MG_SINGLE_NODE_HA
|
|
||||||
SymbolTable symbol_table;
|
|
||||||
auto query_symbol = symbol_table.CreateSymbol("QUERY", false);
|
|
||||||
|
|
||||||
std::vector<Symbol> output_symbols = {query_symbol};
|
|
||||||
std::vector<std::string> header = {query_symbol.name()};
|
|
||||||
|
|
||||||
auto output_plan = std::make_unique<plan::OutputTableStream>(
|
|
||||||
output_symbols, DumpClosure(db_accessor));
|
|
||||||
plan = std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
|
||||||
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
|
|
||||||
|
|
||||||
summary["planning_time"] = planning_timer.Elapsed().count();
|
|
||||||
|
|
||||||
return Results(db_accessor, parsed_query.parameters, plan,
|
|
||||||
std::move(output_symbols), std::move(header),
|
|
||||||
std::move(summary), parsed_query.required_privileges,
|
|
||||||
&execution_memory_,
|
|
||||||
/* is_profile_query */ false,
|
|
||||||
/* should_abort_query */ false);
|
|
||||||
#else
|
|
||||||
throw utils::NotYetImplemented("Dump database");
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
Callback callback;
|
|
||||||
if (auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query)) {
|
|
||||||
if (in_explicit_transaction_) {
|
|
||||||
throw IndexInMulticommandTxException();
|
|
||||||
}
|
|
||||||
// Creating an index influences computed plan costs.
|
|
||||||
auto invalidate_plan_cache = [plan_cache =
|
|
||||||
&this->interpreter_context_->plan_cache] {
|
|
||||||
auto access = plan_cache->access();
|
|
||||||
for (auto &kv : access) {
|
|
||||||
access.remove(kv.first);
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
callback =
|
stream->Result(values);
|
||||||
HandleIndexQuery(index_query, invalidate_plan_cache, db_accessor);
|
|
||||||
} else if (auto *auth_query =
|
|
||||||
utils::Downcast<AuthQuery>(parsed_query.query)) {
|
|
||||||
#ifdef MG_SINGLE_NODE_HA
|
|
||||||
throw utils::NotYetImplemented(
|
|
||||||
"Managing user privileges is not yet supported in Memgraph HA "
|
|
||||||
"instance.");
|
|
||||||
#else
|
|
||||||
if (in_explicit_transaction_) {
|
|
||||||
throw UserModificationInMulticommandTxException();
|
|
||||||
}
|
}
|
||||||
callback = HandleAuthQuery(auth_query, interpreter_context_->auth,
|
|
||||||
parsed_query.parameters, db_accessor);
|
|
||||||
#endif
|
|
||||||
} else if (auto *info_query =
|
|
||||||
utils::Downcast<InfoQuery>(parsed_query.query)) {
|
|
||||||
callback = HandleInfoQuery(info_query, db_accessor);
|
|
||||||
} else if (auto *constraint_query =
|
|
||||||
utils::Downcast<ConstraintQuery>(parsed_query.query)) {
|
|
||||||
callback = HandleConstraintQuery(constraint_query, db_accessor);
|
|
||||||
} else {
|
|
||||||
LOG(FATAL) << "Should not get here -- unknown query type!";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
summary->insert_or_assign("plan_execution_time", timer.Elapsed().count());
|
||||||
|
cursor->Shutdown();
|
||||||
|
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a parsed *Cypher* query's AST into a logical plan.
|
||||||
|
*
|
||||||
|
* The created logical plan will take ownership of the `AstStorage` within
|
||||||
|
* `ParsedQuery` and might modify it during planning.
|
||||||
|
*/
|
||||||
|
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage,
|
||||||
|
CypherQuery *query,
|
||||||
|
const Parameters ¶meters,
|
||||||
|
DbAccessor *db_accessor) {
|
||||||
|
auto vertex_counts = plan::MakeVertexCountCache(db_accessor);
|
||||||
|
auto symbol_table = MakeSymbolTable(query);
|
||||||
|
auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table,
|
||||||
|
query, &vertex_counts);
|
||||||
|
std::unique_ptr<plan::LogicalOperator> root;
|
||||||
|
double cost;
|
||||||
|
std::tie(root, cost) = plan::MakeLogicalPlan(&planning_context, parameters,
|
||||||
|
FLAGS_query_cost_planner);
|
||||||
|
return std::make_unique<SingleNodeLogicalPlan>(
|
||||||
|
std::move(root), cost, std::move(ast_storage), std::move(symbol_table));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the parsed *Cypher* query's AST cached logical plan, or create and
|
||||||
|
* cache a fresh one if it doesn't yet exist.
|
||||||
|
*/
|
||||||
|
std::shared_ptr<CachedPlan> CypherQueryToPlan(
|
||||||
|
HashType hash, AstStorage ast_storage, CypherQuery *query,
|
||||||
|
const Parameters ¶meters, utils::SkipList<PlanCacheEntry> *plan_cache,
|
||||||
|
DbAccessor *db_accessor) {
|
||||||
|
auto plan_cache_access = plan_cache->access();
|
||||||
|
auto it = plan_cache_access.find(hash);
|
||||||
|
if (it != plan_cache_access.end()) {
|
||||||
|
if (it->second->IsExpired()) {
|
||||||
|
plan_cache_access.remove(hash);
|
||||||
|
} else {
|
||||||
|
return it->second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return plan_cache_access
|
||||||
|
.insert({hash,
|
||||||
|
std::make_shared<CachedPlan>(MakeLogicalPlan(
|
||||||
|
std::move(ast_storage), (query), parameters, db_accessor))})
|
||||||
|
.first->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedQuery PrepareCypherQuery(
|
||||||
|
ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||||
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
auto plan = CypherQueryToPlan(
|
||||||
|
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage),
|
||||||
|
utils::Downcast<CypherQuery>(parsed_query.query), parsed_query.parameters,
|
||||||
|
&interpreter_context->plan_cache, dba);
|
||||||
|
|
||||||
|
summary->insert_or_assign("cost_estimate", plan->cost());
|
||||||
|
|
||||||
|
auto output_symbols = plan->plan().OutputSymbols(plan->symbol_table());
|
||||||
|
|
||||||
|
std::vector<std::string> header;
|
||||||
|
header.reserve(output_symbols.size());
|
||||||
|
|
||||||
|
for (const auto &symbol : output_symbols) {
|
||||||
|
// When the symbol is aliased or expanded from '*' (inside RETURN or
|
||||||
|
// WITH), then there is no token position, so use symbol name.
|
||||||
|
// Otherwise, find the name from stripped query.
|
||||||
|
header.push_back(
|
||||||
|
utils::FindOr(parsed_query.stripped_query.named_expressions(),
|
||||||
|
symbol.token_position(), symbol.name())
|
||||||
|
.first);
|
||||||
|
}
|
||||||
|
|
||||||
|
return PreparedQuery{
|
||||||
|
std::move(header), std::move(parsed_query.required_privileges),
|
||||||
|
[plan = std::move(plan), parameters = std::move(parsed_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary, dba,
|
||||||
|
execution_memory](AnyStream *stream) {
|
||||||
|
PullAllPlan(stream, *plan, parameters, output_symbols, false, summary,
|
||||||
|
dba, execution_memory);
|
||||||
|
return true;
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedQuery PrepareExplainQuery(
|
||||||
|
ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||||
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
const std::string kExplainQueryStart = "explain ";
|
||||||
|
|
||||||
|
CHECK(
|
||||||
|
utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()),
|
||||||
|
kExplainQueryStart))
|
||||||
|
<< "Expected stripped query to start with '" << kExplainQueryStart << "'";
|
||||||
|
|
||||||
|
// Parse and cache the inner query separately (as if it was a standalone
|
||||||
|
// query), producing a fresh AST. Note that currently we cannot just reuse
|
||||||
|
// part of the already produced AST because the parameters within ASTs are
|
||||||
|
// looked up using their positions within the string that was parsed. These
|
||||||
|
// wouldn't match up if if we were to reuse the AST (produced by parsing the
|
||||||
|
// full query string) when given just the inner query to execute.
|
||||||
|
ParsedQuery parsed_inner_query =
|
||||||
|
ParseQuery(parsed_query.query_string.substr(kExplainQueryStart.size()),
|
||||||
|
parsed_query.user_parameters, &interpreter_context->ast_cache,
|
||||||
|
&interpreter_context->antlr_lock);
|
||||||
|
|
||||||
|
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
|
||||||
|
CHECK(cypher_query)
|
||||||
|
<< "Cypher grammar should not allow other queries in EXPLAIN";
|
||||||
|
|
||||||
|
auto cypher_query_plan = CypherQueryToPlan(
|
||||||
|
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage),
|
||||||
|
utils::Downcast<CypherQuery>(parsed_query.query), parsed_query.parameters,
|
||||||
|
&interpreter_context->plan_cache, dba);
|
||||||
|
|
||||||
|
std::stringstream printed_plan;
|
||||||
|
plan::PrettyPrint(*dba, &cypher_query_plan->plan(), &printed_plan);
|
||||||
|
|
||||||
|
std::vector<std::vector<TypedValue>> printed_plan_rows;
|
||||||
|
for (const auto &row : utils::Split(utils::RTrim(printed_plan.str()), "\n")) {
|
||||||
|
printed_plan_rows.push_back(std::vector<TypedValue>{TypedValue(row)});
|
||||||
|
}
|
||||||
|
|
||||||
|
summary->insert_or_assign(
|
||||||
|
"explain", plan::PlanToJson(*dba, &cypher_query_plan->plan()).dump());
|
||||||
|
|
||||||
|
SymbolTable symbol_table;
|
||||||
|
auto query_plan_symbol = symbol_table.CreateSymbol("QUERY PLAN", false);
|
||||||
|
std::vector<Symbol> output_symbols{query_plan_symbol};
|
||||||
|
|
||||||
|
auto output_plan =
|
||||||
|
std::make_unique<plan::OutputTable>(output_symbols, printed_plan_rows);
|
||||||
|
|
||||||
|
auto plan =
|
||||||
|
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||||
|
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
|
||||||
|
|
||||||
|
std::vector<std::string> header{query_plan_symbol.name()};
|
||||||
|
|
||||||
|
return PreparedQuery{std::move(header),
|
||||||
|
std::move(parsed_query.required_privileges),
|
||||||
|
[plan = std::move(plan),
|
||||||
|
parameters = std::move(parsed_inner_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary,
|
||||||
|
dba, execution_memory](AnyStream *stream) {
|
||||||
|
PullAllPlan(stream, *plan, parameters, output_symbols,
|
||||||
|
false, summary, dba, execution_memory);
|
||||||
|
return true;
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedQuery PrepareProfileQuery(
|
||||||
|
ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||||
|
std::map<std::string, TypedValue> *summary,
|
||||||
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
const std::string kProfileQueryStart = "profile ";
|
||||||
|
|
||||||
|
CHECK(
|
||||||
|
utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()),
|
||||||
|
kProfileQueryStart))
|
||||||
|
<< "Expected stripped query to start with '" << kProfileQueryStart << "'";
|
||||||
|
|
||||||
|
if (in_explicit_transaction) {
|
||||||
|
throw ProfileInMulticommandTxException();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (interpreter_context->is_tsc_available) {
|
||||||
|
throw QueryException("TSC support is missing for PROFILE");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse and cache the inner query separately (as if it was a standalone
|
||||||
|
// query), producing a fresh AST. Note that currently we cannot just reuse
|
||||||
|
// part of the already produced AST because the parameters within ASTs are
|
||||||
|
// looked up using their positions within the string that was parsed. These
|
||||||
|
// wouldn't match up if if we were to reuse the AST (produced by parsing the
|
||||||
|
// full query string) when given just the inner query to execute.
|
||||||
|
ParsedQuery parsed_inner_query =
|
||||||
|
ParseQuery(parsed_query.query_string.substr(kProfileQueryStart.size()),
|
||||||
|
parsed_query.user_parameters, &interpreter_context->ast_cache,
|
||||||
|
&interpreter_context->antlr_lock);
|
||||||
|
|
||||||
|
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
|
||||||
|
CHECK(cypher_query)
|
||||||
|
<< "Cypher grammar should not allow other queries in PROFILE";
|
||||||
|
|
||||||
|
auto cypher_query_plan = CypherQueryToPlan(
|
||||||
|
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage),
|
||||||
|
utils::Downcast<CypherQuery>(parsed_query.query), parsed_query.parameters,
|
||||||
|
&interpreter_context->plan_cache, dba);
|
||||||
|
|
||||||
|
// Copy the symbol table and add our own symbols (used by the `OutputTable`
|
||||||
|
// operator below)
|
||||||
|
SymbolTable symbol_table(cypher_query_plan->symbol_table());
|
||||||
|
|
||||||
|
auto operator_symbol = symbol_table.CreateSymbol("OPERATOR", false);
|
||||||
|
auto actual_hits_symbol = symbol_table.CreateSymbol("ACTUAL HITS", false);
|
||||||
|
auto relative_time_symbol = symbol_table.CreateSymbol("RELATIVE TIME", false);
|
||||||
|
auto absolute_time_symbol = symbol_table.CreateSymbol("ABSOLUTE TIME", false);
|
||||||
|
|
||||||
|
std::vector<Symbol> output_symbols = {operator_symbol, actual_hits_symbol,
|
||||||
|
relative_time_symbol,
|
||||||
|
absolute_time_symbol};
|
||||||
|
std::vector<std::string> header{
|
||||||
|
operator_symbol.name(), actual_hits_symbol.name(),
|
||||||
|
relative_time_symbol.name(), absolute_time_symbol.name()};
|
||||||
|
|
||||||
|
auto output_plan = std::make_unique<plan::OutputTable>(
|
||||||
|
output_symbols,
|
||||||
|
[cypher_query_plan](Frame *frame, ExecutionContext *context) {
|
||||||
|
utils::MonotonicBufferResource execution_memory(1 * 1024 * 1024);
|
||||||
|
auto cursor = cypher_query_plan->plan().MakeCursor(&execution_memory);
|
||||||
|
|
||||||
|
// We are pulling from another plan, so set up the EvaluationContext
|
||||||
|
// correctly. The rest of the context should be good for sharing.
|
||||||
|
context->evaluation_context.properties = NamesToProperties(
|
||||||
|
cypher_query_plan->ast_storage().properties_, context->db_accessor);
|
||||||
|
context->evaluation_context.labels = NamesToLabels(
|
||||||
|
cypher_query_plan->ast_storage().labels_, context->db_accessor);
|
||||||
|
|
||||||
|
// Pull everything to profile the execution
|
||||||
|
utils::Timer timer;
|
||||||
|
while (cursor->Pull(*frame, *context)) continue;
|
||||||
|
auto execution_time = timer.Elapsed();
|
||||||
|
|
||||||
|
context->profile_execution_time = execution_time;
|
||||||
|
|
||||||
|
return ProfilingStatsToTable(context->stats, execution_time);
|
||||||
|
});
|
||||||
|
|
||||||
|
auto plan =
|
||||||
|
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||||
|
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
|
||||||
|
|
||||||
|
return PreparedQuery{
|
||||||
|
std::move(header), std::move(parsed_query.required_privileges),
|
||||||
|
[plan = std::move(plan),
|
||||||
|
parameters = std::move(parsed_inner_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary, dba,
|
||||||
|
execution_memory](AnyStream *stream) {
|
||||||
|
auto ctx = PullAllPlan(stream, *plan, parameters, output_symbols, true,
|
||||||
|
summary, dba, execution_memory);
|
||||||
|
|
||||||
|
summary->insert_or_assign(
|
||||||
|
"profile",
|
||||||
|
ProfilingStatsToJson(ctx.stats, ctx.profile_execution_time).dump());
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedQuery PrepareDumpQuery(
|
||||||
|
ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||||
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
#ifndef MG_SINGLE_NODE_HA
|
||||||
|
SymbolTable symbol_table;
|
||||||
|
auto query_symbol = symbol_table.CreateSymbol("QUERY", false);
|
||||||
|
|
||||||
|
std::vector<Symbol> output_symbols = {query_symbol};
|
||||||
|
std::vector<std::string> header = {query_symbol.name()};
|
||||||
|
|
||||||
|
auto output_plan = std::make_unique<plan::OutputTableStream>(
|
||||||
|
output_symbols, DumpClosure(dba));
|
||||||
|
auto plan =
|
||||||
|
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||||
|
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
|
||||||
|
|
||||||
|
return PreparedQuery{
|
||||||
|
std::move(header), std::move(parsed_query.required_privileges),
|
||||||
|
[plan = std::move(plan), parameters = std::move(parsed_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary, dba,
|
||||||
|
execution_memory](AnyStream *stream) {
|
||||||
|
PullAllPlan(stream, *plan, parameters, output_symbols, false, summary,
|
||||||
|
dba, execution_memory);
|
||||||
|
return true;
|
||||||
|
}};
|
||||||
|
#else
|
||||||
|
throw utils::NotYetImplemented("Dump database");
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedQuery PrepareIndexQuery(
|
||||||
|
ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||||
|
std::map<std::string, TypedValue> *summary,
|
||||||
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
if (in_explicit_transaction) {
|
||||||
|
throw IndexInMulticommandTxException();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query);
|
||||||
|
|
||||||
|
// Creating an index influences computed plan costs.
|
||||||
|
auto invalidate_plan_cache = [plan_cache = &interpreter_context->plan_cache] {
|
||||||
|
auto access = plan_cache->access();
|
||||||
|
for (auto &kv : access) {
|
||||||
|
access.remove(kv.first);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto callback = HandleIndexQuery(index_query, invalidate_plan_cache, dba);
|
||||||
|
|
||||||
SymbolTable symbol_table;
|
SymbolTable symbol_table;
|
||||||
std::vector<Symbol> output_symbols;
|
std::vector<Symbol> output_symbols;
|
||||||
for (const auto &column : callback.header) {
|
for (const auto &column : callback.header) {
|
||||||
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
|
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
|
||||||
}
|
}
|
||||||
|
|
||||||
plan = std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
auto plan =
|
||||||
std::make_unique<plan::OutputTable>(
|
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||||
output_symbols,
|
std::make_unique<plan::OutputTable>(
|
||||||
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
|
output_symbols,
|
||||||
0.0, AstStorage{}, symbol_table));
|
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
|
||||||
|
0.0, AstStorage{}, symbol_table));
|
||||||
|
|
||||||
auto planning_time = planning_timer.Elapsed();
|
return PreparedQuery{callback.header,
|
||||||
summary["planning_time"] = planning_time.count();
|
std::move(parsed_query.required_privileges),
|
||||||
summary["cost_estimate"] = 0.0;
|
[callback = std::move(callback), plan = std::move(plan),
|
||||||
|
parameters = std::move(parsed_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary,
|
||||||
|
dba, execution_memory](AnyStream *stream) {
|
||||||
|
PullAllPlan(stream, *plan, parameters, output_symbols,
|
||||||
|
false, summary, dba, execution_memory);
|
||||||
|
return !callback.should_abort_query;
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
return Results(db_accessor, parsed_query.parameters, plan,
|
PreparedQuery PrepareAuthQuery(
|
||||||
std::move(output_symbols), callback.header, std::move(summary),
|
ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||||
parsed_query.required_privileges, &execution_memory_,
|
std::map<std::string, TypedValue> *summary,
|
||||||
/* is_profile_query */ false, callback.should_abort_query);
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
#ifdef MG_SINGLE_NODE_HA
|
||||||
|
throw utils::NotYetImplemented(
|
||||||
|
"Managing user privileges is not yet supported in Memgraph HA "
|
||||||
|
"instance.");
|
||||||
|
#else
|
||||||
|
if (in_explicit_transaction) {
|
||||||
|
throw UserModificationInMulticommandTxException();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto *auth_query = utils::Downcast<AuthQuery>(parsed_query.query);
|
||||||
|
|
||||||
|
auto callback = HandleAuthQuery(auth_query, interpreter_context->auth,
|
||||||
|
parsed_query.parameters, dba);
|
||||||
|
|
||||||
|
SymbolTable symbol_table;
|
||||||
|
std::vector<Symbol> output_symbols;
|
||||||
|
for (const auto &column : callback.header) {
|
||||||
|
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto plan =
|
||||||
|
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||||
|
std::make_unique<plan::OutputTable>(
|
||||||
|
output_symbols,
|
||||||
|
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
|
||||||
|
0.0, AstStorage{}, symbol_table));
|
||||||
|
|
||||||
|
return PreparedQuery{callback.header,
|
||||||
|
std::move(parsed_query.required_privileges),
|
||||||
|
|
||||||
|
[callback = std::move(callback), plan = std::move(plan),
|
||||||
|
parameters = std::move(parsed_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary,
|
||||||
|
dba, execution_memory](AnyStream *stream) {
|
||||||
|
PullAllPlan(stream, *plan, parameters, output_symbols,
|
||||||
|
false, summary, dba, execution_memory);
|
||||||
|
return !callback.should_abort_query;
|
||||||
|
}};
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedQuery PrepareInfoQuery(
|
||||||
|
ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||||
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
auto *info_query = utils::Downcast<InfoQuery>(parsed_query.query);
|
||||||
|
|
||||||
|
auto callback = HandleInfoQuery(info_query, dba);
|
||||||
|
|
||||||
|
SymbolTable symbol_table;
|
||||||
|
std::vector<Symbol> output_symbols;
|
||||||
|
for (const auto &column : callback.header) {
|
||||||
|
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto plan =
|
||||||
|
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||||
|
std::make_unique<plan::OutputTable>(
|
||||||
|
output_symbols,
|
||||||
|
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
|
||||||
|
0.0, AstStorage{}, symbol_table));
|
||||||
|
|
||||||
|
return PreparedQuery{callback.header,
|
||||||
|
std::move(parsed_query.required_privileges),
|
||||||
|
[callback = std::move(callback), plan = std::move(plan),
|
||||||
|
parameters = std::move(parsed_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary,
|
||||||
|
dba, execution_memory](AnyStream *stream) {
|
||||||
|
PullAllPlan(stream, *plan, parameters, output_symbols,
|
||||||
|
false, summary, dba, execution_memory);
|
||||||
|
return !callback.should_abort_query;
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
PreparedQuery PrepareConstraintQuery(
|
||||||
|
ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||||
|
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||||
|
utils::MonotonicBufferResource *execution_memory) {
|
||||||
|
auto *constraint_query = utils::Downcast<ConstraintQuery>(parsed_query.query);
|
||||||
|
|
||||||
|
auto callback = HandleConstraintQuery(constraint_query, dba);
|
||||||
|
|
||||||
|
SymbolTable symbol_table;
|
||||||
|
std::vector<Symbol> output_symbols;
|
||||||
|
for (const auto &column : callback.header) {
|
||||||
|
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto plan =
|
||||||
|
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
|
||||||
|
std::make_unique<plan::OutputTable>(
|
||||||
|
output_symbols,
|
||||||
|
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
|
||||||
|
0.0, AstStorage{}, symbol_table));
|
||||||
|
|
||||||
|
return PreparedQuery{callback.header,
|
||||||
|
std::move(parsed_query.required_privileges),
|
||||||
|
[callback = std::move(callback), plan = std::move(plan),
|
||||||
|
parameters = std::move(parsed_query.parameters),
|
||||||
|
output_symbols = std::move(output_symbols), summary,
|
||||||
|
dba, execution_memory](AnyStream *stream) {
|
||||||
|
PullAllPlan(stream, *plan, parameters, output_symbols,
|
||||||
|
false, summary, dba, execution_memory);
|
||||||
|
return !callback.should_abort_query;
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
void Interpreter::Prepare(const std::string &query_string,
|
||||||
|
const std::map<std::string, PropertyValue> ¶ms) {
|
||||||
|
summary_.clear();
|
||||||
|
|
||||||
|
// TODO: Set summary['type'] based on transaction metadata. The type can't be
|
||||||
|
// determined based only on the toplevel logical operator -- for example
|
||||||
|
// `MATCH DELETE RETURN`, which is a write query, will have `Produce` as its
|
||||||
|
// toplevel operator). For now we always set "rw" because something must be
|
||||||
|
// set, but it doesn't have to be correct (for Bolt clients).
|
||||||
|
summary_["type"] = "rw";
|
||||||
|
|
||||||
|
// Set a default cost estimate of 0. Individual queries can overwrite this
|
||||||
|
// field with an improved estimate.
|
||||||
|
summary_["cost_estimate"] = 0.0;
|
||||||
|
|
||||||
|
utils::Timer parsing_timer;
|
||||||
|
ParsedQuery parsed_query =
|
||||||
|
ParseQuery(query_string, params, &interpreter_context_->ast_cache,
|
||||||
|
&interpreter_context_->antlr_lock);
|
||||||
|
summary_["parsing_time"] = parsing_timer.Elapsed().count();
|
||||||
|
|
||||||
|
#ifdef MG_SINGLE_NODE_HA
|
||||||
|
{
|
||||||
|
InfoQuery *info_query = nullptr;
|
||||||
|
if (!execution_db_accessor_->raft()->IsLeader() &&
|
||||||
|
(!(info_query = utils::Downcast<InfoQuery>(parsed_query.query)) ||
|
||||||
|
info_query->info_type_ != InfoQuery::InfoType::RAFT)) {
|
||||||
|
throw raft::CantExecuteQueries();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
utils::Timer planning_timer;
|
||||||
|
PreparedQuery prepared_query;
|
||||||
|
|
||||||
|
if (utils::Downcast<CypherQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareCypherQuery(
|
||||||
|
std::move(parsed_query), &summary_, interpreter_context_,
|
||||||
|
&*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareExplainQuery(
|
||||||
|
std::move(parsed_query), &summary_, interpreter_context_,
|
||||||
|
&*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else if (utils::Downcast<ProfileQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareProfileQuery(
|
||||||
|
std::move(parsed_query), in_explicit_transaction_, &summary_,
|
||||||
|
interpreter_context_, &*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else if (utils::Downcast<DumpQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareDumpQuery(
|
||||||
|
std::move(parsed_query), &summary_, interpreter_context_,
|
||||||
|
&*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else if (utils::Downcast<IndexQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareIndexQuery(
|
||||||
|
std::move(parsed_query), in_explicit_transaction_, &summary_,
|
||||||
|
interpreter_context_, &*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareAuthQuery(
|
||||||
|
std::move(parsed_query), in_explicit_transaction_, &summary_,
|
||||||
|
interpreter_context_, &*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareInfoQuery(
|
||||||
|
std::move(parsed_query), &summary_, interpreter_context_,
|
||||||
|
&*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else if (utils::Downcast<ConstraintQuery>(parsed_query.query)) {
|
||||||
|
prepared_query = PrepareConstraintQuery(
|
||||||
|
std::move(parsed_query), &summary_, interpreter_context_,
|
||||||
|
&*execution_db_accessor_, &execution_memory_);
|
||||||
|
} else {
|
||||||
|
LOG(FATAL) << "Should not get here -- unknown query type!";
|
||||||
|
}
|
||||||
|
|
||||||
|
summary_["planning_time"] = planning_timer.Elapsed().count();
|
||||||
|
prepared_query_ = std::move(prepared_query);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Interpreter::Abort() {
|
void Interpreter::Abort() {
|
||||||
results_ = std::nullopt;
|
prepared_query_ = std::nullopt;
|
||||||
execution_memory_.Release();
|
execution_memory_.Release();
|
||||||
expect_rollback_ = false;
|
expect_rollback_ = false;
|
||||||
in_explicit_transaction_ = false;
|
in_explicit_transaction_ = false;
|
||||||
@ -1153,7 +1408,7 @@ void Interpreter::Abort() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Interpreter::Commit() {
|
void Interpreter::Commit() {
|
||||||
results_ = std::nullopt;
|
prepared_query_ = std::nullopt;
|
||||||
execution_memory_.Release();
|
execution_memory_.Release();
|
||||||
if (!db_accessor_) return;
|
if (!db_accessor_) return;
|
||||||
#ifdef MG_SINGLE_NODE_V2
|
#ifdef MG_SINGLE_NODE_V2
|
||||||
@ -1178,14 +1433,14 @@ void Interpreter::Commit() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Interpreter::AdvanceCommand() {
|
void Interpreter::AdvanceCommand() {
|
||||||
results_ = std::nullopt;
|
prepared_query_ = std::nullopt;
|
||||||
execution_memory_.Release();
|
execution_memory_.Release();
|
||||||
if (!db_accessor_) return;
|
if (!db_accessor_) return;
|
||||||
db_accessor_->AdvanceCommand();
|
db_accessor_->AdvanceCommand();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Interpreter::AbortCommand() {
|
void Interpreter::AbortCommand() {
|
||||||
results_ = std::nullopt;
|
prepared_query_ = std::nullopt;
|
||||||
execution_memory_.Release();
|
execution_memory_.Release();
|
||||||
if (in_explicit_transaction_) {
|
if (in_explicit_transaction_) {
|
||||||
expect_rollback_ = true;
|
expect_rollback_ = true;
|
||||||
@ -1194,40 +1449,4 @@ void Interpreter::AbortCommand() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<CachedPlan> Interpreter::CypherQueryToPlan(
|
|
||||||
HashType query_hash, CypherQuery *query, AstStorage ast_storage,
|
|
||||||
const Parameters ¶meters, DbAccessor *db_accessor) {
|
|
||||||
auto plan_cache_access = interpreter_context_->plan_cache.access();
|
|
||||||
auto it = plan_cache_access.find(query_hash);
|
|
||||||
if (it != plan_cache_access.end()) {
|
|
||||||
if (it->second->IsExpired()) {
|
|
||||||
plan_cache_access.remove(query_hash);
|
|
||||||
} else {
|
|
||||||
return it->second;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return plan_cache_access
|
|
||||||
.insert({query_hash,
|
|
||||||
std::make_shared<CachedPlan>(MakeLogicalPlan(
|
|
||||||
query, std::move(ast_storage), parameters, db_accessor))})
|
|
||||||
.first->second;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<LogicalPlan> Interpreter::MakeLogicalPlan(
|
|
||||||
CypherQuery *query, AstStorage ast_storage, const Parameters ¶meters,
|
|
||||||
DbAccessor *db_accessor) {
|
|
||||||
auto vertex_counts = plan::MakeVertexCountCache(db_accessor);
|
|
||||||
|
|
||||||
auto symbol_table = MakeSymbolTable(query);
|
|
||||||
|
|
||||||
auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table,
|
|
||||||
query, &vertex_counts);
|
|
||||||
std::unique_ptr<plan::LogicalOperator> root;
|
|
||||||
double cost;
|
|
||||||
std::tie(root, cost) = plan::MakeLogicalPlan(&planning_context, parameters,
|
|
||||||
FLAGS_query_cost_planner);
|
|
||||||
return std::make_unique<SingleNodeLogicalPlan>(
|
|
||||||
std::move(root), cost, std::move(ast_storage), std::move(symbol_table));
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace query
|
} // namespace query
|
||||||
|
@ -11,7 +11,6 @@
|
|||||||
#include "query/frontend/stripped.hpp"
|
#include "query/frontend/stripped.hpp"
|
||||||
#include "query/interpret/frame.hpp"
|
#include "query/interpret/frame.hpp"
|
||||||
#include "query/plan/operator.hpp"
|
#include "query/plan/operator.hpp"
|
||||||
#include "utils/likely.hpp"
|
|
||||||
#include "utils/memory.hpp"
|
#include "utils/memory.hpp"
|
||||||
#include "utils/skip_list.hpp"
|
#include "utils/skip_list.hpp"
|
||||||
#include "utils/spin_lock.hpp"
|
#include "utils/spin_lock.hpp"
|
||||||
@ -29,6 +28,60 @@ namespace query {
|
|||||||
|
|
||||||
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
|
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* `AnyStream` can wrap *any* type implementing the `Stream` concept into a
|
||||||
|
* single type.
|
||||||
|
*
|
||||||
|
* The type erasure technique is used. The original type which an `AnyStream`
|
||||||
|
* was constructed from is "erased", as `AnyStream` is not a class template and
|
||||||
|
* doesn't use the type in any way. Client code can then program just for
|
||||||
|
* `AnyStream`, rather than using static polymorphism to handle any type
|
||||||
|
* implementing the `Stream` concept.
|
||||||
|
*/
|
||||||
|
class AnyStream final {
|
||||||
|
public:
|
||||||
|
template <class TStream>
|
||||||
|
AnyStream(TStream *stream, utils::MemoryResource *memory_resource)
|
||||||
|
: content_{utils::Allocator<GenericWrapper<TStream>>{memory_resource}
|
||||||
|
.template new_object<GenericWrapper<TStream>>(stream),
|
||||||
|
[memory_resource](Wrapper *ptr) {
|
||||||
|
utils::Allocator<GenericWrapper<TStream>>{memory_resource}
|
||||||
|
.template delete_object<GenericWrapper<TStream>>(
|
||||||
|
static_cast<GenericWrapper<TStream> *>(ptr));
|
||||||
|
}} {}
|
||||||
|
|
||||||
|
void Result(const std::vector<TypedValue> &values) {
|
||||||
|
content_->Result(values);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct Wrapper {
|
||||||
|
virtual void Result(const std::vector<TypedValue> &values) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class TStream>
|
||||||
|
struct GenericWrapper final : public Wrapper {
|
||||||
|
explicit GenericWrapper(TStream *stream) : stream_{stream} {}
|
||||||
|
|
||||||
|
void Result(const std::vector<TypedValue> &values) override {
|
||||||
|
stream_->Result(values);
|
||||||
|
}
|
||||||
|
|
||||||
|
TStream *stream_;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::unique_ptr<Wrapper, std::function<void(Wrapper *)>> content_;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A container for data related to the preparation of a query.
|
||||||
|
*/
|
||||||
|
struct PreparedQuery {
|
||||||
|
std::vector<std::string> header;
|
||||||
|
std::vector<AuthQuery::Privilege> privileges;
|
||||||
|
std::function<bool(AnyStream *stream)> query_handler;
|
||||||
|
};
|
||||||
|
|
||||||
// TODO: Maybe this should move to query/plan/planner.
|
// TODO: Maybe this should move to query/plan/planner.
|
||||||
/// Interface for accessing the root operator of a logical plan.
|
/// Interface for accessing the root operator of a logical plan.
|
||||||
class LogicalPlan {
|
class LogicalPlan {
|
||||||
@ -121,10 +174,10 @@ struct InterpreterContext {
|
|||||||
database::GraphDb *db;
|
database::GraphDb *db;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Antlr has singleton instance that is shared between threads. It is
|
// ANTLR has singleton instance that is shared between threads. It is
|
||||||
// protected by locks inside of antlr. Unfortunately, they are not protected
|
// protected by locks inside of ANTLR. Unfortunately, they are not protected
|
||||||
// in a very good way. Once we have antlr version without race conditions we
|
// in a very good way. Once we have ANTLR version without race conditions we
|
||||||
// can remove this lock. This will probably never happen since antlr
|
// can remove this lock. This will probably never happen since ANTLR
|
||||||
// developers introduce more bugs in each version. Fortunately, we have
|
// developers introduce more bugs in each version. Fortunately, we have
|
||||||
// cache so this lock probably won't impact performance much...
|
// cache so this lock probably won't impact performance much...
|
||||||
utils::SpinLock antlr_lock;
|
utils::SpinLock antlr_lock;
|
||||||
@ -136,208 +189,58 @@ struct InterpreterContext {
|
|||||||
utils::SkipList<PlanCacheEntry> plan_cache;
|
utils::SkipList<PlanCacheEntry> plan_cache;
|
||||||
};
|
};
|
||||||
|
|
||||||
class Interpreter {
|
class Interpreter final {
|
||||||
public:
|
public:
|
||||||
/**
|
|
||||||
* Encapsulates all what's necessary for the interpretation of a query
|
|
||||||
* into a single object that can be pulled (into the given Stream).
|
|
||||||
*/
|
|
||||||
class Results {
|
|
||||||
friend Interpreter;
|
|
||||||
Results(DbAccessor *db_accessor, const query::Parameters ¶meters,
|
|
||||||
std::shared_ptr<CachedPlan> plan,
|
|
||||||
std::vector<Symbol> output_symbols, std::vector<std::string> header,
|
|
||||||
std::map<std::string, TypedValue> summary,
|
|
||||||
std::vector<AuthQuery::Privilege> privileges,
|
|
||||||
utils::MemoryResource *execution_memory,
|
|
||||||
bool is_profile_query = false, bool should_abort_query = false)
|
|
||||||
: ctx_{db_accessor},
|
|
||||||
plan_(plan),
|
|
||||||
cursor_(plan_->plan().MakeCursor(execution_memory)),
|
|
||||||
frame_(plan_->symbol_table().max_position(), execution_memory),
|
|
||||||
output_symbols_(std::move(output_symbols)),
|
|
||||||
header_(std::move(header)),
|
|
||||||
summary_(std::move(summary)),
|
|
||||||
privileges_(std::move(privileges)),
|
|
||||||
should_abort_query_(should_abort_query) {
|
|
||||||
ctx_.is_profile_query = is_profile_query;
|
|
||||||
ctx_.symbol_table = plan_->symbol_table();
|
|
||||||
ctx_.evaluation_context.timestamp =
|
|
||||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
||||||
std::chrono::system_clock::now().time_since_epoch())
|
|
||||||
.count();
|
|
||||||
ctx_.evaluation_context.parameters = parameters;
|
|
||||||
ctx_.evaluation_context.properties =
|
|
||||||
NamesToProperties(plan_->ast_storage().properties_, db_accessor);
|
|
||||||
ctx_.evaluation_context.labels =
|
|
||||||
NamesToLabels(plan_->ast_storage().labels_, db_accessor);
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
Results(const Results &) = delete;
|
|
||||||
Results(Results &&) = default;
|
|
||||||
Results &operator=(const Results &) = delete;
|
|
||||||
Results &operator=(Results &&) = default;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make the interpreter perform a single Pull. Results (if they exists) are
|
|
||||||
* pushed into the given stream. On first Pull the header is written to the
|
|
||||||
* stream, on last the summary.
|
|
||||||
*
|
|
||||||
* @param stream - The stream to push the header, results and summary into.
|
|
||||||
* @return - If this Results is eligible for another Pull. If Pulling
|
|
||||||
* after `false` has been returned, the behavior is undefined.
|
|
||||||
* @tparam TStream - Stream type.
|
|
||||||
*/
|
|
||||||
template <typename TStream>
|
|
||||||
bool Pull(TStream &stream) {
|
|
||||||
utils::Timer timer;
|
|
||||||
// Setup temporary memory for a single Pull. Initial memory should come
|
|
||||||
// from stack, 256 KiB should fit on the stack and should be more than
|
|
||||||
// enough for a single Pull.
|
|
||||||
constexpr size_t stack_size = 256 * 1024;
|
|
||||||
char stack_data[stack_size];
|
|
||||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size);
|
|
||||||
// TODO (mferencevic): Tune the parameters accordingly.
|
|
||||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
|
||||||
ctx_.evaluation_context.memory = &pool_memory;
|
|
||||||
// We can now Pull a result.
|
|
||||||
bool return_value = cursor_->Pull(frame_, ctx_);
|
|
||||||
if (return_value && !output_symbols_.empty()) {
|
|
||||||
// TODO: The streamed values should also probably use the above memory.
|
|
||||||
std::vector<TypedValue> values;
|
|
||||||
values.reserve(output_symbols_.size());
|
|
||||||
for (const auto &symbol : output_symbols_) {
|
|
||||||
values.emplace_back(frame_[symbol]);
|
|
||||||
}
|
|
||||||
stream.Result(values);
|
|
||||||
}
|
|
||||||
execution_time_ += timer.Elapsed().count();
|
|
||||||
|
|
||||||
if (!return_value) {
|
|
||||||
summary_["plan_execution_time"] = execution_time_;
|
|
||||||
|
|
||||||
if (ctx_.is_profile_query) {
|
|
||||||
summary_["profile"] =
|
|
||||||
ProfilingStatsToJson(ctx_.stats, ctx_.profile_execution_time)
|
|
||||||
.dump();
|
|
||||||
}
|
|
||||||
|
|
||||||
cursor_->Shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
return return_value;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calls Pull() until exhausted. */
|
|
||||||
template <typename TStream>
|
|
||||||
void PullAll(TStream &stream) {
|
|
||||||
while (Pull(stream)) continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const std::vector<std::string> &header() const & { return header_; }
|
|
||||||
std::vector<std::string> &&header() && { return std::move(header_); }
|
|
||||||
const std::map<std::string, TypedValue> &summary() const & {
|
|
||||||
return summary_;
|
|
||||||
}
|
|
||||||
std::map<std::string, TypedValue> &&summary() && {
|
|
||||||
return std::move(summary_);
|
|
||||||
}
|
|
||||||
|
|
||||||
const std::vector<AuthQuery::Privilege> &privileges() {
|
|
||||||
return privileges_;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ShouldAbortQuery() const { return should_abort_query_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
ExecutionContext ctx_;
|
|
||||||
std::shared_ptr<CachedPlan> plan_;
|
|
||||||
query::plan::UniqueCursorPtr cursor_;
|
|
||||||
Frame frame_;
|
|
||||||
std::vector<Symbol> output_symbols_;
|
|
||||||
|
|
||||||
std::vector<std::string> header_;
|
|
||||||
std::map<std::string, TypedValue> summary_;
|
|
||||||
|
|
||||||
double execution_time_{0};
|
|
||||||
|
|
||||||
std::vector<AuthQuery::Privilege> privileges_;
|
|
||||||
|
|
||||||
bool should_abort_query_;
|
|
||||||
};
|
|
||||||
|
|
||||||
explicit Interpreter(InterpreterContext *interpreter_context);
|
explicit Interpreter(InterpreterContext *interpreter_context);
|
||||||
Interpreter(const Interpreter &) = delete;
|
Interpreter(const Interpreter &) = delete;
|
||||||
Interpreter &operator=(const Interpreter &) = delete;
|
Interpreter &operator=(const Interpreter &) = delete;
|
||||||
Interpreter(Interpreter &&) = delete;
|
Interpreter(Interpreter &&) = delete;
|
||||||
Interpreter &operator=(Interpreter &&) = delete;
|
Interpreter &operator=(Interpreter &&) = delete;
|
||||||
|
~Interpreter() { Abort(); }
|
||||||
virtual ~Interpreter() { Abort(); }
|
|
||||||
|
|
||||||
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
|
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
|
||||||
Interpret(const std::string &query,
|
Interpret(const std::string &query,
|
||||||
const std::map<std::string, PropertyValue> ¶ms);
|
const std::map<std::string, PropertyValue> ¶ms);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generates an Results object for the parameters. The resulting object
|
* Prepare a query for execution.
|
||||||
* can be Pulled with its results written to an arbitrary stream.
|
*
|
||||||
|
* To prepare a query for execution means to preprocess the query and adjust
|
||||||
|
* the state of the `Interpreter` in such a way so that the next call to
|
||||||
|
* `PullAll` executes the query.
|
||||||
|
*
|
||||||
|
* @throw raft::CantExecuteQueries if the Memgraph instance is not a Raft
|
||||||
|
* leader and a query other than an Info Raft query was given
|
||||||
|
* @throw query::QueryException
|
||||||
*/
|
*/
|
||||||
virtual Results Prepare(const std::string &query,
|
void Prepare(const std::string &query,
|
||||||
const std::map<std::string, PropertyValue> ¶ms,
|
const std::map<std::string, PropertyValue> ¶ms);
|
||||||
DbAccessor *db_accessor);
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the last prepared query and stream *all* of the results into the
|
||||||
|
* given stream.
|
||||||
|
*
|
||||||
|
* TStream should be a type implementing the `Stream` concept, i.e. it should
|
||||||
|
* contain the member function `void Result(const std::vector<TypedValue> &)`.
|
||||||
|
* The provided vector argument is valid only for the duration of the call to
|
||||||
|
* `Result`. The stream should make an explicit copy if it wants to use it
|
||||||
|
* further.
|
||||||
|
*
|
||||||
|
* @throw utils::BasicException
|
||||||
|
* @throw query::QueryException
|
||||||
|
*/
|
||||||
template <typename TStream>
|
template <typename TStream>
|
||||||
std::map<std::string, TypedValue> PullAll(TStream *result_stream) {
|
std::map<std::string, TypedValue> PullAll(TStream *result_stream);
|
||||||
// If we don't have any results (eg. a transaction command preceeded),
|
|
||||||
// return an empty summary.
|
|
||||||
if (UNLIKELY(!results_)) return {};
|
|
||||||
|
|
||||||
// Stream all results and return the summary.
|
|
||||||
try {
|
|
||||||
results_->PullAll(*result_stream);
|
|
||||||
// Make a copy of the summary because the `Commit` call will destroy the
|
|
||||||
// `results_` object.
|
|
||||||
auto summary = results_->summary();
|
|
||||||
if (!in_explicit_transaction_) {
|
|
||||||
if (results_->ShouldAbortQuery()) {
|
|
||||||
Abort();
|
|
||||||
} else {
|
|
||||||
Commit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return summary;
|
|
||||||
#ifdef MG_SINGLE_NODE_HA
|
|
||||||
} catch (const query::HintedAbortError &) {
|
|
||||||
AbortCommand();
|
|
||||||
throw utils::BasicException("Transaction was asked to abort.");
|
|
||||||
#endif
|
|
||||||
} catch (const utils::BasicException &) {
|
|
||||||
AbortCommand();
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abort the current multicommand transaction.
|
||||||
|
*/
|
||||||
void Abort();
|
void Abort();
|
||||||
|
|
||||||
protected:
|
|
||||||
// high level tree -> logical plan
|
|
||||||
// AstStorage and SymbolTable may be modified during planning. The created
|
|
||||||
// LogicalPlan must take ownership of AstStorage and SymbolTable.
|
|
||||||
virtual std::unique_ptr<LogicalPlan> MakeLogicalPlan(CypherQuery *,
|
|
||||||
AstStorage,
|
|
||||||
const Parameters &,
|
|
||||||
DbAccessor *);
|
|
||||||
|
|
||||||
virtual void PrettyPrintPlan(const DbAccessor &,
|
|
||||||
const plan::LogicalOperator *, std::ostream *);
|
|
||||||
|
|
||||||
virtual std::string PlanToJson(const DbAccessor &,
|
|
||||||
const plan::LogicalOperator *);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
InterpreterContext *interpreter_context_;
|
InterpreterContext *interpreter_context_;
|
||||||
|
std::optional<PreparedQuery> prepared_query_;
|
||||||
|
std::map<std::string, TypedValue> summary_;
|
||||||
|
|
||||||
#ifdef MG_SINGLE_NODE_V2
|
#ifdef MG_SINGLE_NODE_V2
|
||||||
std::optional<storage::Storage::Accessor> db_accessor_;
|
std::optional<storage::Storage::Accessor> db_accessor_;
|
||||||
@ -345,11 +248,6 @@ class Interpreter {
|
|||||||
std::optional<database::GraphDbAccessor> db_accessor_;
|
std::optional<database::GraphDbAccessor> db_accessor_;
|
||||||
#endif
|
#endif
|
||||||
std::optional<DbAccessor> execution_db_accessor_;
|
std::optional<DbAccessor> execution_db_accessor_;
|
||||||
// The `query::Interpreter::Results` object MUST be destroyed before the
|
|
||||||
// `database::GraphDbAccessor` is destroyed because the `Results` object holds
|
|
||||||
// references to the `GraphDb` object and will crash the database when
|
|
||||||
// destructed if you are not careful.
|
|
||||||
std::optional<Results> results_;
|
|
||||||
bool in_explicit_transaction_{false};
|
bool in_explicit_transaction_{false};
|
||||||
bool expect_rollback_{false};
|
bool expect_rollback_{false};
|
||||||
utils::MonotonicBufferResource execution_memory_{kExecutionMemoryBlockSize};
|
utils::MonotonicBufferResource execution_memory_{kExecutionMemoryBlockSize};
|
||||||
@ -357,13 +255,38 @@ class Interpreter {
|
|||||||
void Commit();
|
void Commit();
|
||||||
void AdvanceCommand();
|
void AdvanceCommand();
|
||||||
void AbortCommand();
|
void AbortCommand();
|
||||||
|
|
||||||
// high level tree -> CachedPlan
|
|
||||||
std::shared_ptr<CachedPlan> CypherQueryToPlan(HashType query_hash,
|
|
||||||
CypherQuery *query,
|
|
||||||
AstStorage ast_storage,
|
|
||||||
const Parameters ¶meters,
|
|
||||||
DbAccessor *db_accessor);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template <typename TStream>
|
||||||
|
std::map<std::string, TypedValue> Interpreter::PullAll(TStream *result_stream) {
|
||||||
|
// If we don't have any results (eg. a transaction command preceeded),
|
||||||
|
// return an empty summary.
|
||||||
|
if (!prepared_query_) return {};
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Wrap the (statically polymorphic) stream type into a common type which
|
||||||
|
// the handler knows.
|
||||||
|
AnyStream stream{result_stream, &execution_memory_};
|
||||||
|
bool commit = prepared_query_->query_handler(&stream);
|
||||||
|
|
||||||
|
if (!in_explicit_transaction_) {
|
||||||
|
if (commit) {
|
||||||
|
Commit();
|
||||||
|
} else {
|
||||||
|
Abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return summary_;
|
||||||
|
#ifdef MG_SINGLE_NODE_HA
|
||||||
|
} catch (const query::HintedAbortError &) {
|
||||||
|
AbortCommand();
|
||||||
|
throw utils::BasicException("Transaction was asked to abort.");
|
||||||
|
#endif
|
||||||
|
} catch (const utils::BasicException &) {
|
||||||
|
AbortCommand();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace query
|
} // namespace query
|
||||||
|
Loading…
Reference in New Issue
Block a user