Don't distribute single execution queries
Summary: Queries such as `RETURN 1` should only be run on a single machine. This change assumes that a query should only be distributed if it contains at least one `ScanAll` operator, i.e. a `MATCH` clause. Reviewers: florijan, msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1154
This commit is contained in:
parent
3f30a6fad9
commit
471d63ad86
@ -39,6 +39,14 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
|||||||
using HierarchicalLogicalOperatorVisitor::PostVisit;
|
using HierarchicalLogicalOperatorVisitor::PostVisit;
|
||||||
using HierarchicalLogicalOperatorVisitor::PreVisit;
|
using HierarchicalLogicalOperatorVisitor::PreVisit;
|
||||||
|
|
||||||
|
// Returns true if the plan should be run on master and workers. Note, that
|
||||||
|
// false is returned if the plan is already split.
|
||||||
|
bool ShouldSplit() {
|
||||||
|
// At the moment, the plan should be run on workers only if we encountered a
|
||||||
|
// ScanAll.
|
||||||
|
return !distributed_plan_.worker_plan && has_scan_all_;
|
||||||
|
}
|
||||||
|
|
||||||
// ScanAll are all done on each machine locally.
|
// ScanAll are all done on each machine locally.
|
||||||
bool PreVisit(ScanAll &) override { return true; }
|
bool PreVisit(ScanAll &) override { return true; }
|
||||||
bool PostVisit(ScanAll &) override {
|
bool PostVisit(ScanAll &) override {
|
||||||
@ -93,7 +101,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
|||||||
// 2) Worker plan with operators below Skip, but without Skip itself.
|
// 2) Worker plan with operators below Skip, but without Skip itself.
|
||||||
bool PreVisit(Skip &) override { return true; }
|
bool PreVisit(Skip &) override { return true; }
|
||||||
bool PostVisit(Skip &skip) override {
|
bool PostVisit(Skip &skip) override {
|
||||||
if (!distributed_plan_.worker_plan) {
|
if (ShouldSplit()) {
|
||||||
auto input = skip.input();
|
auto input = skip.input();
|
||||||
distributed_plan_.worker_plan = input;
|
distributed_plan_.worker_plan = input;
|
||||||
skip.set_input(std::make_shared<PullRemote>(
|
skip.set_input(std::make_shared<PullRemote>(
|
||||||
@ -110,7 +118,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
|||||||
// 2) Worker plan with operators below Limit, but including Limit itself.
|
// 2) Worker plan with operators below Limit, but including Limit itself.
|
||||||
bool PreVisit(Limit &) override { return true; }
|
bool PreVisit(Limit &) override { return true; }
|
||||||
bool PostVisit(Limit &limit) override {
|
bool PostVisit(Limit &limit) override {
|
||||||
if (!distributed_plan_.worker_plan) {
|
if (ShouldSplit()) {
|
||||||
// Shallow copy Limit
|
// Shallow copy Limit
|
||||||
distributed_plan_.worker_plan = std::make_shared<Limit>(limit);
|
distributed_plan_.worker_plan = std::make_shared<Limit>(limit);
|
||||||
auto input = limit.input();
|
auto input = limit.input();
|
||||||
@ -127,7 +135,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
|||||||
bool PreVisit(OrderBy &) override { return true; }
|
bool PreVisit(OrderBy &) override { return true; }
|
||||||
bool PostVisit(OrderBy &order_by) override {
|
bool PostVisit(OrderBy &order_by) override {
|
||||||
// TODO: Associative combination of OrderBy
|
// TODO: Associative combination of OrderBy
|
||||||
if (!distributed_plan_.worker_plan) {
|
if (ShouldSplit()) {
|
||||||
auto input = order_by.input();
|
auto input = order_by.input();
|
||||||
distributed_plan_.worker_plan = input;
|
distributed_plan_.worker_plan = input;
|
||||||
order_by.set_input(std::make_shared<PullRemote>(
|
order_by.set_input(std::make_shared<PullRemote>(
|
||||||
@ -140,7 +148,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
|||||||
// Treat Distinct just like Limit.
|
// Treat Distinct just like Limit.
|
||||||
bool PreVisit(Distinct &) override { return true; }
|
bool PreVisit(Distinct &) override { return true; }
|
||||||
bool PostVisit(Distinct &distinct) override {
|
bool PostVisit(Distinct &distinct) override {
|
||||||
if (!distributed_plan_.worker_plan) {
|
if (ShouldSplit()) {
|
||||||
// Shallow copy Distinct
|
// Shallow copy Distinct
|
||||||
distributed_plan_.worker_plan = std::make_shared<Distinct>(distinct);
|
distributed_plan_.worker_plan = std::make_shared<Distinct>(distinct);
|
||||||
auto input = distinct.input();
|
auto input = distinct.input();
|
||||||
@ -167,7 +175,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
|||||||
// completely done on master.
|
// completely done on master.
|
||||||
bool PreVisit(Aggregate &) override { return true; }
|
bool PreVisit(Aggregate &) override { return true; }
|
||||||
bool PostVisit(Aggregate &aggr_op) override {
|
bool PostVisit(Aggregate &aggr_op) override {
|
||||||
if (distributed_plan_.worker_plan) {
|
if (!ShouldSplit()) {
|
||||||
// We have already split the plan, so the aggregation we are visiting is
|
// We have already split the plan, so the aggregation we are visiting is
|
||||||
// on master.
|
// on master.
|
||||||
return true;
|
return true;
|
||||||
@ -308,7 +316,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
|||||||
if (!master_aggr_) return true;
|
if (!master_aggr_) return true;
|
||||||
// We have to rewire master/worker aggregation.
|
// We have to rewire master/worker aggregation.
|
||||||
DCHECK(worker_aggr_);
|
DCHECK(worker_aggr_);
|
||||||
DCHECK(!distributed_plan_.worker_plan);
|
DCHECK(ShouldSplit());
|
||||||
DCHECK(std::dynamic_pointer_cast<Aggregate>(produce.input()));
|
DCHECK(std::dynamic_pointer_cast<Aggregate>(produce.input()));
|
||||||
distributed_plan_.worker_plan = std::move(worker_aggr_);
|
distributed_plan_.worker_plan = std::move(worker_aggr_);
|
||||||
produce.set_input(std::move(master_aggr_));
|
produce.set_input(std::move(master_aggr_));
|
||||||
@ -360,7 +368,7 @@ DistributedPlan MakeDistributedPlan(const LogicalOperator &original_plan,
|
|||||||
Clone(original_plan);
|
Clone(original_plan);
|
||||||
DistributedPlanner planner(distributed_plan);
|
DistributedPlanner planner(distributed_plan);
|
||||||
distributed_plan.master_plan->Accept(planner);
|
distributed_plan.master_plan->Accept(planner);
|
||||||
if (!distributed_plan.worker_plan) {
|
if (planner.ShouldSplit()) {
|
||||||
// We haven't split the plan, this means that it should be the same on
|
// We haven't split the plan, this means that it should be the same on
|
||||||
// master and worker. We only need to prepend PullRemote to master plan.
|
// master and worker. We only need to prepend PullRemote to master plan.
|
||||||
distributed_plan.worker_plan = std::move(distributed_plan.master_plan);
|
distributed_plan.worker_plan = std::move(distributed_plan.master_plan);
|
||||||
|
@ -617,7 +617,7 @@ DEFCOMMAND(ShowDistributed) {
|
|||||||
distributed_plan.master_plan->Accept(printer);
|
distributed_plan.master_plan->Accept(printer);
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
}
|
}
|
||||||
{
|
if (distributed_plan.worker_plan) {
|
||||||
std::cout << "---- Worker Plan ---- " << std::endl;
|
std::cout << "---- Worker Plan ---- " << std::endl;
|
||||||
PlanPrinter printer(dba);
|
PlanPrinter printer(dba);
|
||||||
distributed_plan.worker_plan->Accept(printer);
|
distributed_plan.worker_plan->Accept(printer);
|
||||||
|
Loading…
Reference in New Issue
Block a user