From 743d82b78ddfedf7111319c9d08113a538d344eb Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Tue, 22 Jan 2019 16:24:05 +0100 Subject: [PATCH] Use plan rewrite hook to generate DistributedPlan Reviewers: mtomic, llugovic Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1826 --- src/query/distributed_interpreter.cpp | 67 +++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/src/query/distributed_interpreter.cpp b/src/query/distributed_interpreter.cpp index c6cdec8d7..2dcb8ef4e 100644 --- a/src/query/distributed_interpreter.cpp +++ b/src/query/distributed_interpreter.cpp @@ -56,6 +56,58 @@ class DistributedLogicalPlan final : public LogicalPlan { double cost_; }; +class DistributedPostProcessor final { + // Original plan before rewrite, needed only for temporary cost estimation + // implementation. + std::unique_ptr original_plan_; + std::atomic *next_plan_id_; + Parameters parameters_; + + public: + using ProcessedPlan = plan::DistributedPlan; + + DistributedPostProcessor(const Parameters ¶meters, + std::atomic *next_plan_id) + : next_plan_id_(next_plan_id), parameters_(parameters) {} + + template + plan::DistributedPlan Rewrite(std::unique_ptr plan, + TPlanningContext *context) { + original_plan_ = std::move(plan); + const auto &property_names = context->ast_storage->properties_; + std::vector properties_by_ix; + properties_by_ix.reserve(property_names.size()); + for (const auto &name : property_names) { + properties_by_ix.push_back(context->db->Property(name)); + } + return MakeDistributedPlan(*context->ast_storage, *original_plan_, + *context->symbol_table, *next_plan_id_, + properties_by_ix); + } + + template + double EstimatePlanCost(const plan::DistributedPlan &plan, + TVertexCounts *vertex_counts) { + // TODO: Make cost estimation work with distributed plan. + return ::query::plan::EstimatePlanCost(vertex_counts, parameters_, + *original_plan_); + } + + template + plan::DistributedPlan MergeWithCombinator(plan::DistributedPlan curr_plan, + plan::DistributedPlan last_plan, + const Tree &combinator, + TPlanningContext *context) { + throw utils::NotYetImplemented("query combinator"); + } + + template + plan::DistributedPlan MakeDistinct(plan::DistributedPlan last_op, + TPlanningContext *context) { + throw utils::NotYetImplemented("query combinator"); + } +}; + } // namespace DistributedInterpreter::DistributedInterpreter(database::Master *db) @@ -65,20 +117,15 @@ std::unique_ptr DistributedInterpreter::MakeLogicalPlan( CypherQuery *query, AstStorage ast_storage, const Parameters ¶meters, database::GraphDbAccessor *db_accessor) { auto vertex_counts = plan::MakeVertexCountCache(db_accessor); - auto symbol_table = MakeSymbolTable(query); - auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts); - - std::unique_ptr tmp_logical_plan; + DistributedPostProcessor distributed_post_processor(parameters, + &next_plan_id_); + plan::DistributedPlan plan; double cost; - std::tie(tmp_logical_plan, cost) = plan::MakeLogicalPlan( - &planning_context, parameters, FLAGS_query_cost_planner); - std::vector properties_by_ix = - NamesToProperties(ast_storage.properties_, db_accessor); - auto plan = MakeDistributedPlan(ast_storage, *tmp_logical_plan, symbol_table, - next_plan_id_, properties_by_ix); + std::tie(plan, cost) = plan::MakeLogicalPlan( + &planning_context, &distributed_post_processor, FLAGS_query_cost_planner); VLOG(10) << "[Interpreter] Created plan for distributed execution " << next_plan_id_ - 1; return std::make_unique(std::move(plan), cost,