Make and dispatch worker plans on distributed master

Reviewers: florijan, msantl

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1144
This commit is contained in:
Teon Banek 2018-01-26 16:22:59 +01:00
parent a73a4c3762
commit 760c6246d8
6 changed files with 70 additions and 34 deletions

View File

@ -7,9 +7,9 @@ PlanDispatcher::PlanDispatcher(Coordination &coordination)
void PlanDispatcher::DispatchPlan(
int64_t plan_id, std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable &symbol_table) {
const SymbolTable &symbol_table) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [plan_id, &plan, &symbol_table](communication::rpc::Client &client) {
0, [plan_id, plan, symbol_table](communication::rpc::Client &client) {
auto result =
client.Call<DistributedPlanRpc>(plan_id, plan, symbol_table);
CHECK(result) << "Failed to dispatch plan to worker";

View File

@ -21,7 +21,7 @@ class PlanDispatcher {
*/
void DispatchPlan(int64_t plan_id,
std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable &symbol_table);
const SymbolTable &symbol_table);
private:
RpcWorkerClients clients_;

View File

@ -2,11 +2,12 @@
#include <glog/logging.h>
#include "query/exceptions.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/opencypher/parser.hpp"
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/plan/distributed.hpp"
#include "query/plan/planner.hpp"
#include "query/plan/vertex_count_cache.hpp"
#include "utils/flag_validation.hpp"
@ -74,12 +75,30 @@ Interpreter::Results Interpreter::operator()(
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
MakeLogicalPlan(ast_storage, db_accessor, ctx);
DCHECK(db_accessor.db().type() !=
database::GraphDb::Type::DISTRIBUTED_WORKER);
if (db_accessor.db().type() ==
database::GraphDb::Type::DISTRIBUTED_MASTER) {
auto distributed_plan = MakeDistributedPlan(
*tmp_logical_plan, ctx.symbol_table_, next_plan_id_);
plan = std::make_shared<CachedPlan>(std::move(distributed_plan),
query_plan_cost_estimation);
} else {
plan = std::make_shared<CachedPlan>(
std::move(tmp_logical_plan), query_plan_cost_estimation,
ctx.symbol_table_, std::move(ast_storage));
}
if (FLAGS_query_plan_cache) {
plan_cache_.access().insert(stripped.hash(), plan);
plan = plan_cache_.access().insert(stripped.hash(), plan).first->second;
}
// Dispatch plans to workers (if we have any for them).
if (plan->distributed_plan().worker_plan) {
auto &dispatcher = db_accessor.db().plan_dispatcher();
dispatcher.DispatchPlan(plan->distributed_plan().plan_id,
plan->distributed_plan().worker_plan,
plan->symbol_table());
}
}
@ -100,7 +119,8 @@ Interpreter::Results Interpreter::operator()(
dynamic_cast<const plan::RemoveLabels *>(logical_plan) ||
dynamic_cast<const plan::Delete *>(logical_plan) ||
dynamic_cast<const plan::Merge *>(logical_plan) ||
dynamic_cast<const plan::CreateIndex *>(logical_plan))
dynamic_cast<const plan::CreateIndex *>(logical_plan) ||
dynamic_cast<const plan::PullRemote *>(logical_plan))
<< "Unknown top level LogicalOperator";
ctx.symbol_table_ = plan->symbol_table();

View File

@ -12,6 +12,7 @@
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/stripped.hpp"
#include "query/interpret/frame.hpp"
#include "query/plan/distributed.hpp"
#include "query/plan/operator.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/timer.hpp"
@ -24,16 +25,19 @@ class Interpreter {
private:
class CachedPlan {
public:
explicit CachedPlan(plan::DistributedPlan distributed_plan, double cost)
: distributed_plan_(std::move(distributed_plan)), cost_(cost) {}
CachedPlan(std::unique_ptr<plan::LogicalOperator> plan, double cost,
SymbolTable symbol_table, AstTreeStorage storage)
: plan_(std::move(plan)),
cost_(cost),
symbol_table_(symbol_table),
ast_storage_(std::move(storage)) {}
: distributed_plan_{0, std::move(plan), nullptr, std::move(storage),
symbol_table},
cost_(cost) {}
const auto &plan() const { return *plan_; }
const auto &plan() const { return *distributed_plan_.master_plan; }
const auto &distributed_plan() const { return distributed_plan_; }
double cost() const { return cost_; }
const auto &symbol_table() const { return symbol_table_; }
const auto &symbol_table() const { return distributed_plan_.symbol_table; }
bool IsExpired() const {
auto elapsed = cache_timer_.Elapsed();
@ -42,10 +46,8 @@ class Interpreter {
};
private:
std::unique_ptr<plan::LogicalOperator> plan_;
plan::DistributedPlan distributed_plan_;
double cost_;
SymbolTable symbol_table_;
AstTreeStorage ast_storage_;
utils::Timer cache_timer_;
};
@ -170,6 +172,7 @@ class Interpreter {
ConcurrentMap<HashType, AstTreeStorage> ast_cache_;
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> plan_cache_;
std::atomic<int64_t> next_plan_id_{0};
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected
// in a very good way. Once we have antlr version without race conditions we

View File

@ -196,14 +196,15 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
// Aggregate uses associative operation(s), so split the work across master
// and workers.
auto make_merge_aggregation = [this](auto op, const auto &name) {
auto *worker_count_ident =
distributed_plan_.ast_storage.Create<Identifier>(name);
auto sum_name = Aggregation::OpToString(op) +
std::to_string(worker_count_ident->uid());
auto sum_sym = distributed_plan_.symbol_table.CreateSymbol(
sum_name, false, Symbol::Type::Number);
return Aggregate::Element{worker_count_ident, nullptr, op, sum_sym};
auto make_merge_aggregation = [this](auto op, const auto &worker_sym) {
auto *worker_ident =
distributed_plan_.ast_storage.Create<Identifier>(worker_sym.name());
distributed_plan_.symbol_table[*worker_ident] = worker_sym;
auto merge_name =
Aggregation::OpToString(op) + std::to_string(worker_ident->uid());
auto merge_sym = distributed_plan_.symbol_table.CreateSymbol(
merge_name, false, Symbol::Type::Number);
return Aggregate::Element{worker_ident, nullptr, op, merge_sym};
};
std::vector<Aggregate::Element> master_aggrs;
master_aggrs.reserve(aggr_op.aggregations().size());
@ -212,13 +213,13 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// Count, like sum, only needs to sum all of the results on master.
case Aggregation::Op::COUNT:
case Aggregation::Op::SUM:
master_aggrs.emplace_back(make_merge_aggregation(
Aggregation::Op::SUM, aggr.output_sym.name()));
master_aggrs.emplace_back(
make_merge_aggregation(Aggregation::Op::SUM, aggr.output_sym));
break;
case Aggregation::Op::MIN:
case Aggregation::Op::MAX:
master_aggrs.emplace_back(
make_merge_aggregation(aggr.op, aggr.output_sym.name()));
make_merge_aggregation(aggr.op, aggr.output_sym));
break;
default:
throw utils::NotYetImplemented("distributed planning");
@ -253,11 +254,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
std::vector<NamedExpression *> produce_exprs;
produce_exprs.reserve(aggr_op->aggregations().size());
for (int i = 0; i < aggr_op->aggregations().size(); ++i) {
const auto &final_result_sym = master_aggrs_[i].output_sym;
const auto &merge_result_sym = master_aggrs_[i].output_sym;
const auto &original_result_sym = aggr_op->aggregations()[i].output_sym;
auto *ident = distributed_plan_.ast_storage.Create<Identifier>(
final_result_sym.name());
distributed_plan_.symbol_table[*ident] = final_result_sym;
merge_result_sym.name());
distributed_plan_.symbol_table[*ident] = merge_result_sym;
auto *nexpr = distributed_plan_.ast_storage.Create<NamedExpression>(
original_result_sym.name(), ident);
distributed_plan_.symbol_table[*nexpr] = original_result_sym;

View File

@ -1029,11 +1029,20 @@ class Produce : public LogicalOperator {
Produce() {}
friend class boost::serialization::access;
BOOST_SERIALIZATION_SPLIT_MEMBER();
template <class TArchive>
void serialize(TArchive &ar, const unsigned int) {
void save(TArchive &ar, const unsigned int) const {
ar &boost::serialization::base_object<LogicalOperator>(*this);
ar &input_;
ar &named_expressions_;
SavePointers(ar, named_expressions_);
}
template <class TArchive>
void load(TArchive &ar, const unsigned int) {
ar &boost::serialization::base_object<LogicalOperator>(*this);
ar &input_;
LoadPointers(ar, named_expressions_);
}
};
@ -2267,6 +2276,9 @@ class PullRemote : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override {
return symbols_;
}
const auto &symbols() const { return symbols_; }
auto plan_id() const { return plan_id_; }