From 760c6246d82b08f4732bee49b5d0003bbdb0d525 Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Fri, 26 Jan 2018 16:22:59 +0100 Subject: [PATCH] Make and dispatch worker plans on distributed master Reviewers: florijan, msantl Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1144 --- src/distributed/plan_dispatcher.cpp | 4 ++-- src/distributed/plan_dispatcher.hpp | 2 +- src/query/interpreter.cpp | 32 +++++++++++++++++++++++------ src/query/interpreter.hpp | 21 +++++++++++-------- src/query/plan/distributed.cpp | 29 +++++++++++++------------- src/query/plan/operator.hpp | 16 +++++++++++++-- 6 files changed, 70 insertions(+), 34 deletions(-) diff --git a/src/distributed/plan_dispatcher.cpp b/src/distributed/plan_dispatcher.cpp index 681916009..7dc92f1ff 100644 --- a/src/distributed/plan_dispatcher.cpp +++ b/src/distributed/plan_dispatcher.cpp @@ -7,9 +7,9 @@ PlanDispatcher::PlanDispatcher(Coordination &coordination) void PlanDispatcher::DispatchPlan( int64_t plan_id, std::shared_ptr plan, - SymbolTable &symbol_table) { + const SymbolTable &symbol_table) { auto futures = clients_.ExecuteOnWorkers( - 0, [plan_id, &plan, &symbol_table](communication::rpc::Client &client) { + 0, [plan_id, plan, symbol_table](communication::rpc::Client &client) { auto result = client.Call(plan_id, plan, symbol_table); CHECK(result) << "Failed to dispatch plan to worker"; diff --git a/src/distributed/plan_dispatcher.hpp b/src/distributed/plan_dispatcher.hpp index 042be0bd8..c08aeb21a 100644 --- a/src/distributed/plan_dispatcher.hpp +++ b/src/distributed/plan_dispatcher.hpp @@ -21,7 +21,7 @@ class PlanDispatcher { */ void DispatchPlan(int64_t plan_id, std::shared_ptr plan, - SymbolTable &symbol_table); + const SymbolTable &symbol_table); private: RpcWorkerClients clients_; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index a7862461b..229e0c7ca 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -2,11 +2,12 @@ #include -#include "query/exceptions.hpp" +#include "distributed/plan_dispatcher.hpp" #include "query/exceptions.hpp" #include "query/frontend/ast/cypher_main_visitor.hpp" #include "query/frontend/opencypher/parser.hpp" #include "query/frontend/semantic/symbol_generator.hpp" +#include "query/plan/distributed.hpp" #include "query/plan/planner.hpp" #include "query/plan/vertex_count_cache.hpp" #include "utils/flag_validation.hpp" @@ -74,12 +75,30 @@ Interpreter::Results Interpreter::operator()( std::tie(tmp_logical_plan, query_plan_cost_estimation) = MakeLogicalPlan(ast_storage, db_accessor, ctx); - plan = std::make_shared( - std::move(tmp_logical_plan), query_plan_cost_estimation, - ctx.symbol_table_, std::move(ast_storage)); + DCHECK(db_accessor.db().type() != + database::GraphDb::Type::DISTRIBUTED_WORKER); + if (db_accessor.db().type() == + database::GraphDb::Type::DISTRIBUTED_MASTER) { + auto distributed_plan = MakeDistributedPlan( + *tmp_logical_plan, ctx.symbol_table_, next_plan_id_); + plan = std::make_shared(std::move(distributed_plan), + query_plan_cost_estimation); + } else { + plan = std::make_shared( + std::move(tmp_logical_plan), query_plan_cost_estimation, + ctx.symbol_table_, std::move(ast_storage)); + } if (FLAGS_query_plan_cache) { - plan_cache_.access().insert(stripped.hash(), plan); + plan = plan_cache_.access().insert(stripped.hash(), plan).first->second; + } + + // Dispatch plans to workers (if we have any for them). + if (plan->distributed_plan().worker_plan) { + auto &dispatcher = db_accessor.db().plan_dispatcher(); + dispatcher.DispatchPlan(plan->distributed_plan().plan_id, + plan->distributed_plan().worker_plan, + plan->symbol_table()); } } @@ -100,7 +119,8 @@ Interpreter::Results Interpreter::operator()( dynamic_cast(logical_plan) || dynamic_cast(logical_plan) || dynamic_cast(logical_plan) || - dynamic_cast(logical_plan)) + dynamic_cast(logical_plan) || + dynamic_cast(logical_plan)) << "Unknown top level LogicalOperator"; ctx.symbol_table_ = plan->symbol_table(); diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index f9e294e21..bddcf271f 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -12,6 +12,7 @@ #include "query/frontend/ast/ast.hpp" #include "query/frontend/stripped.hpp" #include "query/interpret/frame.hpp" +#include "query/plan/distributed.hpp" #include "query/plan/operator.hpp" #include "threading/sync/spinlock.hpp" #include "utils/timer.hpp" @@ -24,16 +25,19 @@ class Interpreter { private: class CachedPlan { public: + explicit CachedPlan(plan::DistributedPlan distributed_plan, double cost) + : distributed_plan_(std::move(distributed_plan)), cost_(cost) {} + CachedPlan(std::unique_ptr plan, double cost, SymbolTable symbol_table, AstTreeStorage storage) - : plan_(std::move(plan)), - cost_(cost), - symbol_table_(symbol_table), - ast_storage_(std::move(storage)) {} + : distributed_plan_{0, std::move(plan), nullptr, std::move(storage), + symbol_table}, + cost_(cost) {} - const auto &plan() const { return *plan_; } + const auto &plan() const { return *distributed_plan_.master_plan; } + const auto &distributed_plan() const { return distributed_plan_; } double cost() const { return cost_; } - const auto &symbol_table() const { return symbol_table_; } + const auto &symbol_table() const { return distributed_plan_.symbol_table; } bool IsExpired() const { auto elapsed = cache_timer_.Elapsed(); @@ -42,10 +46,8 @@ class Interpreter { }; private: - std::unique_ptr plan_; + plan::DistributedPlan distributed_plan_; double cost_; - SymbolTable symbol_table_; - AstTreeStorage ast_storage_; utils::Timer cache_timer_; }; @@ -170,6 +172,7 @@ class Interpreter { ConcurrentMap ast_cache_; ConcurrentMap> plan_cache_; + std::atomic next_plan_id_{0}; // Antlr has singleton instance that is shared between threads. It is // protected by locks inside of antlr. Unfortunately, they are not protected // in a very good way. Once we have antlr version without race conditions we diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp index e41dff9cd..be72f7cd5 100644 --- a/src/query/plan/distributed.cpp +++ b/src/query/plan/distributed.cpp @@ -196,14 +196,15 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { } // Aggregate uses associative operation(s), so split the work across master // and workers. - auto make_merge_aggregation = [this](auto op, const auto &name) { - auto *worker_count_ident = - distributed_plan_.ast_storage.Create(name); - auto sum_name = Aggregation::OpToString(op) + - std::to_string(worker_count_ident->uid()); - auto sum_sym = distributed_plan_.symbol_table.CreateSymbol( - sum_name, false, Symbol::Type::Number); - return Aggregate::Element{worker_count_ident, nullptr, op, sum_sym}; + auto make_merge_aggregation = [this](auto op, const auto &worker_sym) { + auto *worker_ident = + distributed_plan_.ast_storage.Create(worker_sym.name()); + distributed_plan_.symbol_table[*worker_ident] = worker_sym; + auto merge_name = + Aggregation::OpToString(op) + std::to_string(worker_ident->uid()); + auto merge_sym = distributed_plan_.symbol_table.CreateSymbol( + merge_name, false, Symbol::Type::Number); + return Aggregate::Element{worker_ident, nullptr, op, merge_sym}; }; std::vector master_aggrs; master_aggrs.reserve(aggr_op.aggregations().size()); @@ -212,13 +213,13 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { // Count, like sum, only needs to sum all of the results on master. case Aggregation::Op::COUNT: case Aggregation::Op::SUM: - master_aggrs.emplace_back(make_merge_aggregation( - Aggregation::Op::SUM, aggr.output_sym.name())); + master_aggrs.emplace_back( + make_merge_aggregation(Aggregation::Op::SUM, aggr.output_sym)); break; case Aggregation::Op::MIN: case Aggregation::Op::MAX: master_aggrs.emplace_back( - make_merge_aggregation(aggr.op, aggr.output_sym.name())); + make_merge_aggregation(aggr.op, aggr.output_sym)); break; default: throw utils::NotYetImplemented("distributed planning"); @@ -253,11 +254,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { std::vector produce_exprs; produce_exprs.reserve(aggr_op->aggregations().size()); for (int i = 0; i < aggr_op->aggregations().size(); ++i) { - const auto &final_result_sym = master_aggrs_[i].output_sym; + const auto &merge_result_sym = master_aggrs_[i].output_sym; const auto &original_result_sym = aggr_op->aggregations()[i].output_sym; auto *ident = distributed_plan_.ast_storage.Create( - final_result_sym.name()); - distributed_plan_.symbol_table[*ident] = final_result_sym; + merge_result_sym.name()); + distributed_plan_.symbol_table[*ident] = merge_result_sym; auto *nexpr = distributed_plan_.ast_storage.Create( original_result_sym.name(), ident); distributed_plan_.symbol_table[*nexpr] = original_result_sym; diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index b8694bef4..d710c777f 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -1029,11 +1029,20 @@ class Produce : public LogicalOperator { Produce() {} friend class boost::serialization::access; + BOOST_SERIALIZATION_SPLIT_MEMBER(); + template - void serialize(TArchive &ar, const unsigned int) { + void save(TArchive &ar, const unsigned int) const { ar &boost::serialization::base_object(*this); ar &input_; - ar &named_expressions_; + SavePointers(ar, named_expressions_); + } + + template + void load(TArchive &ar, const unsigned int) { + ar &boost::serialization::base_object(*this); + ar &input_; + LoadPointers(ar, named_expressions_); } }; @@ -2267,6 +2276,9 @@ class PullRemote : public LogicalOperator { std::unique_ptr MakeCursor( database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override { + return symbols_; + } const auto &symbols() const { return symbols_; } auto plan_id() const { return plan_id_; }