Use plan rewrite hook to generate DistributedPlan

Reviewers: mtomic, llugovic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1826
This commit is contained in:
Teon Banek 2019-01-22 16:24:05 +01:00
parent abc9f0b0e7
commit 743d82b78d

View File

@ -56,6 +56,58 @@ class DistributedLogicalPlan final : public LogicalPlan {
double cost_; double cost_;
}; };
class DistributedPostProcessor final {
// Original plan before rewrite, needed only for temporary cost estimation
// implementation.
std::unique_ptr<plan::LogicalOperator> original_plan_;
std::atomic<int64_t> *next_plan_id_;
Parameters parameters_;
public:
using ProcessedPlan = plan::DistributedPlan;
DistributedPostProcessor(const Parameters &parameters,
std::atomic<int64_t> *next_plan_id)
: next_plan_id_(next_plan_id), parameters_(parameters) {}
template <class TPlanningContext>
plan::DistributedPlan Rewrite(std::unique_ptr<plan::LogicalOperator> plan,
TPlanningContext *context) {
original_plan_ = std::move(plan);
const auto &property_names = context->ast_storage->properties_;
std::vector<storage::Property> properties_by_ix;
properties_by_ix.reserve(property_names.size());
for (const auto &name : property_names) {
properties_by_ix.push_back(context->db->Property(name));
}
return MakeDistributedPlan(*context->ast_storage, *original_plan_,
*context->symbol_table, *next_plan_id_,
properties_by_ix);
}
template <class TVertexCounts>
double EstimatePlanCost(const plan::DistributedPlan &plan,
TVertexCounts *vertex_counts) {
// TODO: Make cost estimation work with distributed plan.
return ::query::plan::EstimatePlanCost(vertex_counts, parameters_,
*original_plan_);
}
template <class TPlanningContext>
plan::DistributedPlan MergeWithCombinator(plan::DistributedPlan curr_plan,
plan::DistributedPlan last_plan,
const Tree &combinator,
TPlanningContext *context) {
throw utils::NotYetImplemented("query combinator");
}
template <class TPlanningContext>
plan::DistributedPlan MakeDistinct(plan::DistributedPlan last_op,
TPlanningContext *context) {
throw utils::NotYetImplemented("query combinator");
}
};
} // namespace } // namespace
DistributedInterpreter::DistributedInterpreter(database::Master *db) DistributedInterpreter::DistributedInterpreter(database::Master *db)
@ -65,20 +117,15 @@ std::unique_ptr<LogicalPlan> DistributedInterpreter::MakeLogicalPlan(
CypherQuery *query, AstStorage ast_storage, const Parameters &parameters, CypherQuery *query, AstStorage ast_storage, const Parameters &parameters,
database::GraphDbAccessor *db_accessor) { database::GraphDbAccessor *db_accessor) {
auto vertex_counts = plan::MakeVertexCountCache(db_accessor); auto vertex_counts = plan::MakeVertexCountCache(db_accessor);
auto symbol_table = MakeSymbolTable(query); auto symbol_table = MakeSymbolTable(query);
auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table,
query, &vertex_counts); query, &vertex_counts);
DistributedPostProcessor distributed_post_processor(parameters,
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan; &next_plan_id_);
plan::DistributedPlan plan;
double cost; double cost;
std::tie(tmp_logical_plan, cost) = plan::MakeLogicalPlan( std::tie(plan, cost) = plan::MakeLogicalPlan(
&planning_context, parameters, FLAGS_query_cost_planner); &planning_context, &distributed_post_processor, FLAGS_query_cost_planner);
std::vector<storage::Property> properties_by_ix =
NamesToProperties(ast_storage.properties_, db_accessor);
auto plan = MakeDistributedPlan(ast_storage, *tmp_logical_plan, symbol_table,
next_plan_id_, properties_by_ix);
VLOG(10) << "[Interpreter] Created plan for distributed execution " VLOG(10) << "[Interpreter] Created plan for distributed execution "
<< next_plan_id_ - 1; << next_plan_id_ - 1;
return std::make_unique<DistributedLogicalPlan>(std::move(plan), cost, return std::make_unique<DistributedLogicalPlan>(std::move(plan), cost,