Refactor Interpreter

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D896
This commit is contained in:
Mislav Bradac 2017-10-10 17:57:15 +02:00
parent cce6db3442
commit e078c9f7cd
3 changed files with 100 additions and 101 deletions

View File

@ -13,6 +13,12 @@
*/
class ResultStreamFaker {
public:
ResultStreamFaker() = default;
ResultStreamFaker(const ResultStreamFaker &) = delete;
ResultStreamFaker &operator=(const ResultStreamFaker &) = delete;
ResultStreamFaker(ResultStreamFaker &&) = default;
ResultStreamFaker &operator=(ResultStreamFaker &&) = default;
void Header(const std::vector<std::string> &fields) {
debug_assert(current_state_ == State::Start,
"Headers can only be written in the beginning");
@ -32,14 +38,14 @@ class ResultStreamFaker {
current_state_ = State::Done;
}
const auto &GetHeader() {
const auto &GetHeader() const {
debug_assert(current_state_ != State::Start, "Header not written");
return header_;
}
const auto &GetResults() { return results_; }
const auto &GetResults() const { return results_; }
const auto &GetSummary() {
const auto &GetSummary() const {
debug_assert(current_state_ == State::Done, "Summary not written");
return summary_;
}

View File

@ -63,7 +63,7 @@ std::string TypedValueToString(const query::TypedValue &value) {
/**
* Prints out all the given results to standard out.
*/
void PrintResults(ResultStreamFaker results) {
void PrintResults(const ResultStreamFaker &results) {
const std::vector<std::string> &header = results.GetHeader();
std::vector<int> column_widths(header.size());
std::transform(header.begin(), header.end(), column_widths.begin(),

View File

@ -55,7 +55,11 @@ class Interpreter {
};
public:
Interpreter() {}
Interpreter() = default;
Interpreter(const Interpreter &) = delete;
Interpreter &operator=(const Interpreter &) = delete;
Interpreter(Interpreter &&) = delete;
Interpreter &operator=(Interpreter &&) = delete;
template <typename Stream>
void Interpret(const std::string &query, GraphDbAccessor &db_accessor,
@ -64,8 +68,8 @@ class Interpreter {
bool in_explicit_transaction) {
utils::Timer frontend_timer;
Context ctx(db_accessor);
ctx.is_query_cached_ = true;
ctx.in_explicit_transaction_ = in_explicit_transaction;
ctx.is_query_cached_ = true;
std::map<std::string, TypedValue> summary;
// query -> stripped query
@ -82,119 +86,55 @@ class Interpreter {
ctx.parameters_.Add(param_pair.first, param_it->second);
}
std::shared_ptr<CachedPlan> cached_plan;
std::experimental::optional<AstTreeStorage> ast_storage;
// Check if we have a cached logical plan ready, so that we can skip the
// whole query -> AST -> logical_plan process.
auto plan_cache_accessor = plan_cache_.access();
auto plan_cache_it = plan_cache_accessor.find(stripped.hash());
if (plan_cache_it != plan_cache_accessor.end() &&
plan_cache_it->second->IsExpired()) {
// Remove the expired plan.
plan_cache_accessor.remove(stripped.hash());
plan_cache_it = plan_cache_accessor.end();
}
if (plan_cache_it == plan_cache_accessor.end()) {
// We didn't find a cached plan or it was expired.
// stripped query -> high level tree
ast_storage = QueryToAst(stripped, ctx);
} else {
cached_plan = plan_cache_it->second;
}
auto cached_plan = [&]() -> std::shared_ptr<CachedPlan> {
auto plan_cache_accessor = plan_cache_.access();
auto plan_cache_it = plan_cache_accessor.find(stripped.hash());
if (plan_cache_it != plan_cache_accessor.end() &&
plan_cache_it->second->IsExpired()) {
// Remove the expired plan.
plan_cache_accessor.remove(stripped.hash());
plan_cache_it = plan_cache_accessor.end();
}
if (plan_cache_it != plan_cache_accessor.end()) {
return plan_cache_it->second;
}
return nullptr;
}();
auto frontend_time = frontend_timer.Elapsed();
utils::Timer planning_timer;
auto fill_symbol_table = [](auto &ast_storage, auto &symbol_table) {
SymbolGenerator symbol_generator(symbol_table);
if (!cached_plan) {
AstTreeStorage ast_storage = QueryToAst(stripped, ctx);
SymbolGenerator symbol_generator(ctx.symbol_table_);
ast_storage.query()->Accept(symbol_generator);
};
// If the plan is not stored in the cache, `tmp_logical_plan` owns the newly
// generated plan. Otherwise, it is empty and `cached_plan` owns the plan.
// In all cases, `logical_plan` references the plan to be used.
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
const plan::LogicalOperator *logical_plan = nullptr;
double query_plan_cost_estimation = 0.0;
if (FLAGS_query_plan_cache) {
if (!cached_plan) {
debug_assert(ast_storage, "AST is required to generate a plan");
fill_symbol_table(*ast_storage, ctx.symbol_table_);
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
MakeLogicalPlan(*ast_storage, db_accessor, ctx);
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
double query_plan_cost_estimation = 0.0;
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
MakeLogicalPlan(ast_storage, db_accessor, ctx);
cached_plan = std::make_shared<CachedPlan>(
std::move(tmp_logical_plan), query_plan_cost_estimation,
ctx.symbol_table_, std::move(ast_storage));
if (FLAGS_query_plan_cache) {
// Cache the generated plan.
auto plan_cache_accessor = plan_cache_.access();
auto plan_cache_it =
plan_cache_accessor
.insert(
stripped.hash(),
std::make_shared<CachedPlan>(
std::move(tmp_logical_plan), query_plan_cost_estimation,
ctx.symbol_table_, std::move(*ast_storage)))
.first;
plan_cache_accessor.insert(stripped.hash(), cached_plan).first;
cached_plan = plan_cache_it->second;
}
query_plan_cost_estimation = cached_plan->cost();
ctx.symbol_table_ = cached_plan->symbol_table();
logical_plan = &cached_plan->plan();
} else {
debug_assert(ast_storage, "Without plan caching, AST must be generated.");
fill_symbol_table(*ast_storage, ctx.symbol_table_);
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
MakeLogicalPlan(*ast_storage, db_accessor, ctx);
logical_plan = tmp_logical_plan.get();
}
// Below this point, ast_storage should not be used. Other than not allowing
// modifications, the ast_storage may have moved to a cache.
ctx.symbol_table_ = cached_plan->symbol_table();
// Generate frame based on symbol table max_position.
Frame frame(ctx.symbol_table_.max_position());
auto planning_time = planning_timer.Elapsed();
utils::Timer execution_timer;
std::vector<std::string> header;
std::vector<Symbol> output_symbols(
logical_plan->OutputSymbols(ctx.symbol_table_));
if (!output_symbols.empty()) {
// Since we have output symbols, this means that the query contains RETURN
// clause, so stream out the results.
// Generate header.
for (const auto &symbol : output_symbols) {
// When the symbol is aliased or expanded from '*' (inside RETURN or
// WITH), then there is no token position, so use symbol name.
// Otherwise, find the name from stripped query.
header.push_back(utils::FindOr(stripped.named_expressions(),
symbol.token_position(), symbol.name())
.first);
}
stream.Header(header);
// Stream out results.
auto cursor = logical_plan->MakeCursor(db_accessor);
while (cursor->Pull(frame, ctx)) {
std::vector<TypedValue> values;
for (const auto &symbol : output_symbols)
values.emplace_back(frame[symbol]);
stream.Result(values);
}
} else if (dynamic_cast<const plan::CreateNode *>(logical_plan) ||
dynamic_cast<const plan::CreateExpand *>(logical_plan) ||
dynamic_cast<const plan::SetProperty *>(logical_plan) ||
dynamic_cast<const plan::SetProperties *>(logical_plan) ||
dynamic_cast<const plan::SetLabels *>(logical_plan) ||
dynamic_cast<const plan::RemoveProperty *>(logical_plan) ||
dynamic_cast<const plan::RemoveLabels *>(logical_plan) ||
dynamic_cast<const plan::Delete *>(logical_plan) ||
dynamic_cast<const plan::Merge *>(logical_plan) ||
dynamic_cast<const plan::CreateIndex *>(logical_plan)) {
stream.Header(header);
auto cursor = logical_plan->MakeCursor(db_accessor);
while (cursor->Pull(frame, ctx)) continue;
} else {
throw QueryRuntimeException("Unknown top level LogicalOperator");
}
ExecutePlan(stream, &cached_plan->plan(), ctx, stripped);
auto execution_time = execution_timer.Elapsed();
if (ctx.is_index_created_) {
@ -209,7 +149,7 @@ class Interpreter {
summary["parsing_time"] = frontend_time.count();
summary["planning_time"] = planning_time.count();
summary["plan_execution_time"] = execution_time.count();
summary["cost_estimate"] = query_plan_cost_estimation;
summary["cost_estimate"] = cached_plan->cost();
// TODO: set summary['type'] based on transaction metadata
// the type can't be determined based only on top level LogicalOp
@ -231,6 +171,59 @@ class Interpreter {
std::pair<std::unique_ptr<plan::LogicalOperator>, double> MakeLogicalPlan(
AstTreeStorage &, const GraphDbAccessor &, Context &);
template <typename Stream>
void ExecutePlan(Stream &stream, const plan::LogicalOperator *logical_plan,
Context &ctx, const StrippedQuery &stripped) {
// Generate frame based on symbol table max_position.
Frame frame(ctx.symbol_table_.max_position());
std::vector<std::string> header;
std::vector<Symbol> output_symbols(
logical_plan->OutputSymbols(ctx.symbol_table_));
if (!output_symbols.empty()) {
// Since we have output symbols, this means that the query contains RETURN
// clause, so stream out the results.
// Generate header.
for (const auto &symbol : output_symbols) {
// When the symbol is aliased or expanded from '*' (inside RETURN or
// WITH), then there is no token position, so use symbol name.
// Otherwise, find the name from stripped query.
header.push_back(utils::FindOr(stripped.named_expressions(),
symbol.token_position(), symbol.name())
.first);
}
stream.Header(header);
// Stream out results.
auto cursor = logical_plan->MakeCursor(ctx.db_accessor_);
while (cursor->Pull(frame, ctx)) {
std::vector<TypedValue> values;
for (const auto &symbol : output_symbols) {
values.emplace_back(frame[symbol]);
}
stream.Result(values);
}
return;
}
if (dynamic_cast<const plan::CreateNode *>(logical_plan) ||
dynamic_cast<const plan::CreateExpand *>(logical_plan) ||
dynamic_cast<const plan::SetProperty *>(logical_plan) ||
dynamic_cast<const plan::SetProperties *>(logical_plan) ||
dynamic_cast<const plan::SetLabels *>(logical_plan) ||
dynamic_cast<const plan::RemoveProperty *>(logical_plan) ||
dynamic_cast<const plan::RemoveLabels *>(logical_plan) ||
dynamic_cast<const plan::Delete *>(logical_plan) ||
dynamic_cast<const plan::Merge *>(logical_plan) ||
dynamic_cast<const plan::CreateIndex *>(logical_plan)) {
stream.Header(header);
auto cursor = logical_plan->MakeCursor(ctx.db_accessor_);
while (cursor->Pull(frame, ctx)) continue;
} else {
throw QueryRuntimeException("Unknown top level LogicalOperator");
}
}
ConcurrentMap<HashType, AstTreeStorage> ast_cache_;
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> plan_cache_;
// Antlr has singleton instance that is shared between threads. It is