diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp index dece15692..fb6d80a32 100644 --- a/src/query/plan/distributed.cpp +++ b/src/query/plan/distributed.cpp @@ -39,6 +39,14 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { using HierarchicalLogicalOperatorVisitor::PostVisit; using HierarchicalLogicalOperatorVisitor::PreVisit; + // Returns true if the plan should be run on master and workers. Note, that + // false is returned if the plan is already split. + bool ShouldSplit() { + // At the moment, the plan should be run on workers only if we encountered a + // ScanAll. + return !distributed_plan_.worker_plan && has_scan_all_; + } + // ScanAll are all done on each machine locally. bool PreVisit(ScanAll &) override { return true; } bool PostVisit(ScanAll &) override { @@ -93,7 +101,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { // 2) Worker plan with operators below Skip, but without Skip itself. bool PreVisit(Skip &) override { return true; } bool PostVisit(Skip &skip) override { - if (!distributed_plan_.worker_plan) { + if (ShouldSplit()) { auto input = skip.input(); distributed_plan_.worker_plan = input; skip.set_input(std::make_shared( @@ -110,7 +118,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { // 2) Worker plan with operators below Limit, but including Limit itself. bool PreVisit(Limit &) override { return true; } bool PostVisit(Limit &limit) override { - if (!distributed_plan_.worker_plan) { + if (ShouldSplit()) { // Shallow copy Limit distributed_plan_.worker_plan = std::make_shared(limit); auto input = limit.input(); @@ -127,7 +135,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { bool PreVisit(OrderBy &) override { return true; } bool PostVisit(OrderBy &order_by) override { // TODO: Associative combination of OrderBy - if (!distributed_plan_.worker_plan) { + if (ShouldSplit()) { auto input = order_by.input(); distributed_plan_.worker_plan = input; order_by.set_input(std::make_shared( @@ -140,7 +148,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { // Treat Distinct just like Limit. bool PreVisit(Distinct &) override { return true; } bool PostVisit(Distinct &distinct) override { - if (!distributed_plan_.worker_plan) { + if (ShouldSplit()) { // Shallow copy Distinct distributed_plan_.worker_plan = std::make_shared(distinct); auto input = distinct.input(); @@ -167,7 +175,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { // completely done on master. bool PreVisit(Aggregate &) override { return true; } bool PostVisit(Aggregate &aggr_op) override { - if (distributed_plan_.worker_plan) { + if (!ShouldSplit()) { // We have already split the plan, so the aggregation we are visiting is // on master. return true; @@ -308,7 +316,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { if (!master_aggr_) return true; // We have to rewire master/worker aggregation. DCHECK(worker_aggr_); - DCHECK(!distributed_plan_.worker_plan); + DCHECK(ShouldSplit()); DCHECK(std::dynamic_pointer_cast(produce.input())); distributed_plan_.worker_plan = std::move(worker_aggr_); produce.set_input(std::move(master_aggr_)); @@ -360,7 +368,7 @@ DistributedPlan MakeDistributedPlan(const LogicalOperator &original_plan, Clone(original_plan); DistributedPlanner planner(distributed_plan); distributed_plan.master_plan->Accept(planner); - if (!distributed_plan.worker_plan) { + if (planner.ShouldSplit()) { // We haven't split the plan, this means that it should be the same on // master and worker. We only need to prepend PullRemote to master plan. distributed_plan.worker_plan = std::move(distributed_plan.master_plan); diff --git a/tests/manual/query_planner.cpp b/tests/manual/query_planner.cpp index 64c4c8bd5..502109824 100644 --- a/tests/manual/query_planner.cpp +++ b/tests/manual/query_planner.cpp @@ -617,7 +617,7 @@ DEFCOMMAND(ShowDistributed) { distributed_plan.master_plan->Accept(printer); std::cout << std::endl; } - { + if (distributed_plan.worker_plan) { std::cout << "---- Worker Plan ---- " << std::endl; PlanPrinter printer(dba); distributed_plan.worker_plan->Accept(printer);