Plan basic distributed Cartesian without any checks

Summary:
This is still very much in progress.  No advanced checks are done to
prevent planning unimplemented things.  Basic Cartesian product should
work, for example `MATCH (a), (b) CREATE (a)-[:r]->(c)-[:r]->(b)`. But
anything more advanced may lead to undefined behaviour of the planner
and therefore execution. Use at your own risk!

Add ModifiedSymbols method to LogicalOperator

For planning Cartesian, we need information on which symbols are filled
by operator sub-trees.  Currently, this is used to set symbols which
should be transferred over network. Later, they should be used to detect
whether filter expressions use symbols modified from Cartesian branches.
Then we will be able to ensure correct dependency of filters and their
behaviour.

Prepare DistributedPlan for multiple worker plans

Since Cartesian branches need to be split and handled by each worker, we
now dispatch multiple plans to workers.

Reviewers: florijan, msantl, buda

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1208
This commit is contained in:
Teon Banek 2018-02-20 11:08:43 +01:00
parent 8856682c7f
commit 2055176139
8 changed files with 707 additions and 210 deletions

View File

@ -90,16 +90,16 @@ Interpreter::Results Interpreter::operator()(
}
// Dispatch plans to workers (if we have any for them).
if (plan->distributed_plan().worker_plan) {
for (const auto &plan_pair : plan->distributed_plan().worker_plans) {
const auto &plan_id = plan_pair.first;
const auto &worker_plan = plan_pair.second;
auto &dispatcher = db_accessor.db().plan_dispatcher();
dispatcher.DispatchPlan(plan->distributed_plan().plan_id,
plan->distributed_plan().worker_plan,
plan->symbol_table());
dispatcher.DispatchPlan(plan_id, worker_plan, plan->symbol_table());
}
if (FLAGS_query_plan_cache) {
// TODO: If the same plan was already cached, invalidate the dispatched
// plan (above) from workers.
// plans (above) from workers.
plan = plan_cache_.access().insert(stripped.hash(), plan).first->second;
}
}

View File

@ -30,7 +30,7 @@ class Interpreter {
CachedPlan(std::unique_ptr<plan::LogicalOperator> plan, double cost,
SymbolTable symbol_table, AstTreeStorage storage)
: distributed_plan_{0, std::move(plan), nullptr, std::move(storage),
: distributed_plan_{0, std::move(plan), {}, std::move(storage),
symbol_table},
cost_(cost) {}

View File

@ -32,33 +32,117 @@ std::pair<std::unique_ptr<LogicalOperator>, AstTreeStorage> Clone(
AstTreeStorage::kHelperId))};
}
int64_t AddWorkerPlan(DistributedPlan &distributed_plan,
std::atomic<int64_t> &next_plan_id,
const std::shared_ptr<LogicalOperator> &worker_plan) {
int64_t plan_id = next_plan_id++;
distributed_plan.worker_plans.emplace_back(plan_id, worker_plan);
return plan_id;
}
class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
public:
DistributedPlanner(DistributedPlan &distributed_plan)
: distributed_plan_(distributed_plan) {}
DistributedPlanner(DistributedPlan &distributed_plan,
std::atomic<int64_t> &next_plan_id)
: distributed_plan_(distributed_plan), next_plan_id_(next_plan_id) {}
using HierarchicalLogicalOperatorVisitor::PostVisit;
using HierarchicalLogicalOperatorVisitor::PreVisit;
// Returns true if the plan should be run on master and workers. Note, that
// false is returned if the plan is already split.
bool ShouldSplit() const {
// At the moment, the plan should be run on workers only if we encountered a
// ScanAll.
return !distributed_plan_.worker_plan && has_scan_all_;
}
bool ShouldSplit() const { return should_split_; }
bool NeedsSynchronize() const { return needs_synchronize_; }
// ScanAll are all done on each machine locally.
// We need special care when multiple ScanAll operators appear, this means we
// need a Cartesian product. Both the left and the right side of Cartesian
// should be sent as standalone plans to each worker. Then, the master
// execution should use PullRemote to wire them into Cartesian. In case of
// multiple Cartesians, we send each ScanAll part to workers and chain them
// into multiple Cartesians on master.
//
// For example, `ScanAll(n) > ScanAll(m) > ScanAll(l)` is transformed to:
//
// workers | master
//
// * ScanAll(n) - - - - - - - - - \
// + PullRemote(n) \
// * ScanAll(m) \ + Cartesian
// +- PullRemote(m) \ /
// + Cartesian - -
// +- PullRemote(n) /
// * ScanAll(l) /
//
// Things get more complicated if any branch of the Cartesian has a Filter
// operator which depends on the result from another branch.
//
// For example, `ScanAll (n) > ScanAll (m) > Filter (m.prop = n.prop)`:
//
// workers | master
//
// * ScanAll(n) - - - - - - - - - - - - - - \
// + PullRemote(n) \
// + Cartesian
// + PullRemote(m) /
// * ScanAll(m) - - Filter (m.prop = n.prop) /
//
// Since the Filter depends on the first ScanAll branch, we can either:
// * enforce the first branch is evaluated before and data sent back to
// workers to evaluate the second; or
// * move the Filter after the Cartesian (or maybe inline it inside).
//
// Inserting the Cartesian operator is done through PlanCartesian while
// post-visiting Produce, Aggregate or write operators.
//
// TODO: Consider planning Cartesian during regular planning in
// RuleBasedPlanner.
// TODO: Finish Cartesian planning:
// * checking Filters can be used;
// * checking ExpandUniquenessFilter can be used;
// * checking indexed ScanAll can be used;
// * checking Expand into existing can be used;
// * allowing Cartesian after Produce (i.e. after WITH clause);
// * ...
std::shared_ptr<Cartesian> PlanCartesian(
const std::shared_ptr<LogicalOperator> &rhs_input) {
std::shared_ptr<Cartesian> cartesian;
auto pull_id = AddWorkerPlan(rhs_input);
std::shared_ptr<LogicalOperator> right_op = std::make_shared<PullRemote>(
rhs_input, pull_id,
rhs_input->ModifiedSymbols(distributed_plan_.symbol_table));
// We use this ordering of operators, so that left hand side can be
// accumulated without having whole product accumulations. This (obviously)
// relies on the fact that Cartesian accumulation strategy accumulates the
// left operator input.
while (!left_cartesians_.empty()) {
auto left_op = left_cartesians_.back();
left_cartesians_.pop_back();
cartesian = std::make_shared<Cartesian>(
left_op, left_op->ModifiedSymbols(distributed_plan_.symbol_table),
right_op, right_op->ModifiedSymbols(distributed_plan_.symbol_table));
right_op = cartesian;
}
return cartesian;
};
bool PreVisit(ScanAll &scan) override {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAll &) override {
bool PostVisit(ScanAll &scan) override {
prev_ops_.pop_back();
RaiseIfCartesian();
RaiseIfHasWorkerPlan();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
}
has_scan_all_ = true;
return true;
}
@ -67,10 +151,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAllByLabel &) override {
bool PostVisit(ScanAllByLabel &scan) override {
prev_ops_.pop_back();
RaiseIfCartesian();
RaiseIfHasWorkerPlan();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
}
has_scan_all_ = true;
return true;
}
@ -78,10 +170,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAllByLabelPropertyRange &) override {
bool PostVisit(ScanAllByLabelPropertyRange &scan) override {
prev_ops_.pop_back();
RaiseIfCartesian();
RaiseIfHasWorkerPlan();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
}
has_scan_all_ = true;
return true;
}
@ -89,10 +189,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAllByLabelPropertyValue &) override {
bool PostVisit(ScanAllByLabelPropertyValue &scan) override {
prev_ops_.pop_back();
RaiseIfCartesian();
RaiseIfHasWorkerPlan();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
}
has_scan_all_ = true;
return true;
}
@ -141,10 +249,10 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.pop_back();
if (ShouldSplit()) {
auto input = skip.input();
distributed_plan_.worker_plan = input;
skip.set_input(std::make_shared<PullRemote>(
input, distributed_plan_.plan_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
auto pull_id = AddWorkerPlan(input);
Split(skip, std::make_shared<PullRemote>(
input, pull_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
}
return true;
}
@ -162,18 +270,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.pop_back();
if (ShouldSplit()) {
// Shallow copy Limit
distributed_plan_.worker_plan = std::make_shared<Limit>(limit);
auto pull_id = AddWorkerPlan(std::make_shared<Limit>(limit));
auto input = limit.input();
limit.set_input(std::make_shared<PullRemote>(
input, distributed_plan_.plan_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
Split(limit, std::make_shared<PullRemote>(
input, pull_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
}
return true;
}
// OrderBy is an associative operator, this means we can do ordering
// on workers and then merge the results on master. This requires a more
// involved solution, so for now treat OrderBy just like Split.
// involved solution, so for now treat OrderBy just like Skip.
bool PreVisit(OrderBy &order_by) override {
prev_ops_.push_back(&order_by);
return true;
@ -183,10 +291,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// TODO: Associative combination of OrderBy
if (ShouldSplit()) {
auto input = order_by.input();
distributed_plan_.worker_plan = input;
order_by.set_input(std::make_shared<PullRemote>(
input, distributed_plan_.plan_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
auto pull_id = AddWorkerPlan(input);
Split(order_by,
std::make_shared<PullRemote>(
input, pull_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
}
return true;
}
@ -200,11 +309,12 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.pop_back();
if (ShouldSplit()) {
// Shallow copy Distinct
distributed_plan_.worker_plan = std::make_shared<Distinct>(distinct);
auto pull_id = AddWorkerPlan(std::make_shared<Distinct>(distinct));
auto input = distinct.input();
distinct.set_input(std::make_shared<PullRemote>(
input, distributed_plan_.plan_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
Split(distinct,
std::make_shared<PullRemote>(
input, pull_id,
input->OutputSymbols(distributed_plan_.symbol_table)));
}
return true;
}
@ -229,6 +339,10 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(Aggregate &aggr_op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(aggr_op, PlanCartesian(aggr_op.input()));
return true;
}
if (!ShouldSplit()) {
// We have already split the plan, so the aggregation we are visiting is
// on master.
@ -251,7 +365,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
};
if (!is_associative()) {
auto input = aggr_op.input();
distributed_plan_.worker_plan = input;
auto pull_id = AddWorkerPlan(input);
std::unordered_set<Symbol> pull_symbols(aggr_op.remember().begin(),
aggr_op.remember().end());
for (const auto &elem : aggr_op.aggregations()) {
@ -261,9 +375,10 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
pull_symbols.insert(collector.symbols_.begin(),
collector.symbols_.end());
}
aggr_op.set_input(std::make_shared<PullRemote>(
input, distributed_plan_.plan_id,
std::vector<Symbol>(pull_symbols.begin(), pull_symbols.end())));
Split(aggr_op,
std::make_shared<PullRemote>(
input, pull_id,
std::vector<Symbol>(pull_symbols.begin(), pull_symbols.end())));
return true;
}
auto make_ident = [this](const auto &symbol) {
@ -357,20 +472,24 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
throw utils::NotYetImplemented("distributed planning");
}
}
// Rewiring is done in PostVisit(Produce), so just store our results.
worker_aggr_ = std::make_shared<Aggregate>(
// Rewire master/worker aggregation.
auto worker_plan = std::make_shared<Aggregate>(
aggr_op.input(), worker_aggrs, aggr_op.group_by(), aggr_op.remember());
auto pull_id = AddWorkerPlan(worker_plan);
std::vector<Symbol> pull_symbols;
pull_symbols.reserve(worker_aggrs.size() + aggr_op.remember().size());
for (const auto &aggr : worker_aggrs)
pull_symbols.push_back(aggr.output_sym);
for (const auto &sym : aggr_op.remember()) pull_symbols.push_back(sym);
auto pull_op = std::make_shared<PullRemote>(
worker_aggr_, distributed_plan_.plan_id, pull_symbols);
auto pull_op =
std::make_shared<PullRemote>(worker_plan, pull_id, pull_symbols);
auto master_aggr_op = std::make_shared<Aggregate>(
pull_op, master_aggrs, aggr_op.group_by(), aggr_op.remember());
// Make our master Aggregate into Produce + Aggregate
master_aggr_ = std::make_unique<Produce>(master_aggr_op, produce_exprs);
auto master_plan = std::make_unique<Produce>(master_aggr_op, produce_exprs);
auto produce = dynamic_cast<Produce *>(prev_ops_.back());
DCHECK(produce) << "Expected Aggregate is directly below Produce";
Split(*produce, std::move(master_plan));
return true;
}
@ -380,13 +499,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(Produce &produce) override {
prev_ops_.pop_back();
if (!master_aggr_) return true;
// We have to rewire master/worker aggregation.
DCHECK(worker_aggr_);
DCHECK(ShouldSplit());
DCHECK(std::dynamic_pointer_cast<Aggregate>(produce.input()));
distributed_plan_.worker_plan = std::move(worker_aggr_);
produce.set_input(std::move(master_aggr_));
if (!left_cartesians_.empty()) {
// TODO: It might be better to plan Cartesians later if this Produce isn't
// the last one and is not followed by an operator which requires a merge
// point (Skip, OrderBy, etc.).
if (!on_master_) {
Split(produce, PlanCartesian(produce.input()));
} else {
// We are on master, so our produce input must come on the left hand
// side.
throw utils::NotYetImplemented("distributed planning");
}
}
return true;
}
@ -420,9 +544,9 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// same as on workers.
std::shared_ptr<PullRemote> pull_remote;
if (ShouldSplit()) {
distributed_plan_.worker_plan = acc.input();
pull_remote = std::make_shared<PullRemote>(
nullptr, distributed_plan_.plan_id, acc.symbols());
auto pull_id = AddWorkerPlan(acc.input());
pull_remote =
std::make_shared<PullRemote>(nullptr, pull_id, acc.symbols());
}
auto sync = std::make_shared<Synchronize>(acc.input(), pull_remote,
acc.advance_command());
@ -431,9 +555,9 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// TODO: Find a better way to replace the previous operation's input than
// using dynamic casting.
if (auto *produce = dynamic_cast<Produce *>(prev_op)) {
produce->set_input(sync);
Split(*produce, sync);
} else if (auto *aggr_op = dynamic_cast<Aggregate *>(prev_op)) {
aggr_op->set_input(sync);
Split(*aggr_op, sync);
} else {
throw utils::NotYetImplemented("distributed planning");
}
@ -441,15 +565,19 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
return true;
}
// CRUD operators follow
bool PreVisit(CreateNode &op) override {
// TODO: Creation needs to be modified if running on master, so as to
// distribute node creation to workers.
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(CreateNode &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -458,9 +586,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(CreateExpand &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -469,9 +599,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(Delete &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -480,9 +612,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetProperty &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -491,9 +625,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetProperties &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -502,9 +638,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetLabels &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -513,9 +651,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(RemoveProperty &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -524,9 +664,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(RemoveLabels &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
return true;
}
@ -543,21 +685,29 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
private:
DistributedPlan &distributed_plan_;
// Used for rewiring the master/worker aggregation in PostVisit(Produce)
std::shared_ptr<LogicalOperator> worker_aggr_;
std::unique_ptr<LogicalOperator> master_aggr_;
std::atomic<int64_t> &next_plan_id_;
std::vector<LogicalOperator *> prev_ops_;
// Left side operators that still need to be wired into Cartesian.
std::vector<std::shared_ptr<LogicalOperator>> left_cartesians_;
bool has_scan_all_ = false;
bool needs_synchronize_ = false;
bool should_split_ = false;
// True if we have added a worker merge point on master, i.e. the rest of the
// plan is executing on master.
bool on_master_ = false;
void RaiseIfCartesian() {
if (has_scan_all_)
throw utils::NotYetImplemented("Cartesian product distributed planning");
// Sets the master_op input to be merge_op. Afterwards, on_master_ is true.
template <class TOp>
void Split(TOp &master_op, std::shared_ptr<LogicalOperator> merge_op) {
if (on_master_) throw utils::NotYetImplemented("distributed planning");
master_op.set_input(merge_op);
on_master_ = true;
}
void RaiseIfHasWorkerPlan() {
if (distributed_plan_.worker_plan)
throw utils::NotYetImplemented("distributed planning");
int64_t AddWorkerPlan(const std::shared_ptr<LogicalOperator> &worker_plan) {
should_split_ = false;
return ::query::plan::AddWorkerPlan(distributed_plan_, next_plan_id_,
worker_plan);
}
};
@ -569,31 +719,32 @@ DistributedPlan MakeDistributedPlan(const LogicalOperator &original_plan,
DistributedPlan distributed_plan;
// If we will generate multiple worker plans, we will need to increment the
// next_plan_id for each one.
distributed_plan.plan_id = next_plan_id++;
distributed_plan.master_plan_id = next_plan_id++;
distributed_plan.symbol_table = symbol_table;
std::tie(distributed_plan.master_plan, distributed_plan.ast_storage) =
Clone(original_plan);
DistributedPlanner planner(distributed_plan);
DistributedPlanner planner(distributed_plan, next_plan_id);
distributed_plan.master_plan->Accept(planner);
if (planner.ShouldSplit()) {
// We haven't split the plan, this means that it should be the same on
// master and worker. We only need to prepend PullRemote to master plan.
distributed_plan.worker_plan = std::move(distributed_plan.master_plan);
std::shared_ptr<LogicalOperator> worker_plan(
std::move(distributed_plan.master_plan));
auto pull_id = AddWorkerPlan(distributed_plan, next_plan_id, worker_plan);
// If the plan performs writes, we need to finish with Synchronize.
if (planner.NeedsSynchronize()) {
auto pull_remote = std::make_shared<PullRemote>(
nullptr, distributed_plan.plan_id,
distributed_plan.worker_plan->OutputSymbols(
distributed_plan.symbol_table));
distributed_plan.master_plan = std::make_unique<Synchronize>(
distributed_plan.worker_plan, pull_remote, false);
nullptr, pull_id,
worker_plan->OutputSymbols(distributed_plan.symbol_table));
distributed_plan.master_plan =
std::make_unique<Synchronize>(worker_plan, pull_remote, false);
} else {
distributed_plan.master_plan = std::make_unique<PullRemote>(
distributed_plan.worker_plan, distributed_plan.plan_id,
distributed_plan.worker_plan->OutputSymbols(
distributed_plan.symbol_table));
worker_plan, pull_id,
worker_plan->OutputSymbols(distributed_plan.symbol_table));
}
} else if (!distributed_plan.worker_plan && planner.NeedsSynchronize()) {
} else if (distributed_plan.worker_plans.empty() &&
planner.NeedsSynchronize()) {
// If the plan performs writes, we still need to Synchronize.
distributed_plan.master_plan = std::make_unique<Synchronize>(
std::move(distributed_plan.master_plan), nullptr, false);

View File

@ -10,13 +10,12 @@ namespace query::plan {
/// Complete plan split into master/worker parts.
struct DistributedPlan {
int64_t plan_id;
int64_t master_plan_id;
/// Plan to be executed on the master server.
std::unique_ptr<LogicalOperator> master_plan;
/// Plan to be executed on each worker.
///
/// Worker plan is also shared in the master plan.
std::shared_ptr<LogicalOperator> worker_plan;
/// Pairs of {plan_id, plan} for execution on each worker.
std::vector<std::pair<int64_t, std::shared_ptr<LogicalOperator>>>
worker_plans;
/// Ast storage with newly added expressions.
AstTreeStorage ast_storage;
/// Symbol table with newly added symbols.

View File

@ -105,6 +105,13 @@ std::unique_ptr<Cursor> CreateNode::MakeCursor(
return std::make_unique<CreateNodeCursor>(*this, db);
}
std::vector<Symbol> CreateNode::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(table.at(*node_atom_->identifier_));
return symbols;
}
CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
@ -148,6 +155,14 @@ std::unique_ptr<Cursor> CreateExpand::MakeCursor(
return std::make_unique<CreateExpandCursor>(*this, db);
}
std::vector<Symbol> CreateExpand::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(table.at(*node_atom_->identifier_));
symbols.emplace_back(table.at(*edge_atom_->identifier_));
return symbols;
}
CreateExpand::CreateExpandCursor::CreateExpandCursor(
const CreateExpand &self, database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
@ -290,6 +305,12 @@ std::unique_ptr<Cursor> ScanAll::MakeCursor(
output_symbol_, input_->MakeCursor(db), std::move(vertices), db);
}
std::vector<Symbol> ScanAll::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(output_symbol_);
return symbols;
}
ScanAllByLabel::ScanAllByLabel(const std::shared_ptr<LogicalOperator> &input,
Symbol output_symbol, storage::Label label,
GraphView graph_view)
@ -442,6 +463,13 @@ std::unique_ptr<Cursor> Expand::MakeCursor(
return std::make_unique<ExpandCursor>(*this, db);
}
std::vector<Symbol> Expand::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(node_symbol());
symbols.emplace_back(edge_symbol());
return symbols;
}
Expand::ExpandCursor::ExpandCursor(const Expand &self,
database::GraphDbAccessor &db)
: self_(self), input_cursor_(self.input_->MakeCursor(db)), db_(db) {}
@ -602,6 +630,14 @@ ExpandVariable::ExpandVariable(
ACCEPT_WITH_INPUT(ExpandVariable)
std::vector<Symbol> ExpandVariable::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(node_symbol());
symbols.emplace_back(edge_symbol());
return symbols;
}
namespace {
/**
* Helper function that returns an iterable over
@ -1341,6 +1377,13 @@ std::unique_ptr<Cursor> ConstructNamedPath::MakeCursor(
return std::make_unique<ConstructNamedPathCursor>(*this, db);
}
std::vector<Symbol> ConstructNamedPath::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(path_symbol_);
return symbols;
}
Filter::Filter(const std::shared_ptr<LogicalOperator> &input,
Expression *expression)
: input_(input ? input : std::make_shared<Once>()),
@ -1353,6 +1396,10 @@ std::unique_ptr<Cursor> Filter::MakeCursor(
return std::make_unique<FilterCursor>(*this, db);
}
std::vector<Symbol> Filter::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
Filter::FilterCursor::FilterCursor(const Filter &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
@ -1391,6 +1438,10 @@ std::vector<Symbol> Produce::OutputSymbols(
return symbols;
}
std::vector<Symbol> Produce::ModifiedSymbols(const SymbolTable &table) const {
return OutputSymbols(table);
}
const std::vector<NamedExpression *> &Produce::named_expressions() {
return named_expressions_;
}
@ -1424,6 +1475,10 @@ std::unique_ptr<Cursor> Delete::MakeCursor(
return std::make_unique<DeleteCursor>(*this, db);
}
std::vector<Symbol> Delete::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
Delete::DeleteCursor::DeleteCursor(const Delete &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
@ -1492,6 +1547,11 @@ std::unique_ptr<Cursor> SetProperty::MakeCursor(
return std::make_unique<SetPropertyCursor>(*this, db);
}
std::vector<Symbol> SetProperty::ModifiedSymbols(
const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
SetProperty::SetPropertyCursor::SetPropertyCursor(const SetProperty &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
@ -1542,6 +1602,11 @@ std::unique_ptr<Cursor> SetProperties::MakeCursor(
return std::make_unique<SetPropertiesCursor>(*this, db);
}
std::vector<Symbol> SetProperties::ModifiedSymbols(
const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
SetProperties::SetPropertiesCursor::SetPropertiesCursor(
const SetProperties &self, database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
@ -1634,6 +1699,10 @@ std::unique_ptr<Cursor> SetLabels::MakeCursor(
return std::make_unique<SetLabelsCursor>(*this, db);
}
std::vector<Symbol> SetLabels::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
SetLabels::SetLabelsCursor::SetLabelsCursor(const SetLabels &self,
database::GraphDbAccessor &db)
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
@ -1669,6 +1738,11 @@ std::unique_ptr<Cursor> RemoveProperty::MakeCursor(
return std::make_unique<RemovePropertyCursor>(*this, db);
}
std::vector<Symbol> RemoveProperty::ModifiedSymbols(
const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
RemoveProperty::RemovePropertyCursor::RemovePropertyCursor(
const RemoveProperty &self, database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
@ -1723,6 +1797,11 @@ std::unique_ptr<Cursor> RemoveLabels::MakeCursor(
return std::make_unique<RemoveLabelsCursor>(*this, db);
}
std::vector<Symbol> RemoveLabels::ModifiedSymbols(
const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
RemoveLabels::RemoveLabelsCursor::RemoveLabelsCursor(
const RemoveLabels &self, database::GraphDbAccessor &db)
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
@ -1765,6 +1844,12 @@ std::unique_ptr<Cursor> ExpandUniquenessFilter<TAccessor>::MakeCursor(
return std::make_unique<ExpandUniquenessFilterCursor>(*this, db);
}
template <typename TAccessor>
std::vector<Symbol> ExpandUniquenessFilter<TAccessor>::ModifiedSymbols(
const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
template <typename TAccessor>
ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilterCursor::
ExpandUniquenessFilterCursor(const ExpandUniquenessFilter &self,
@ -1842,6 +1927,10 @@ std::unique_ptr<Cursor> Accumulate::MakeCursor(
return std::make_unique<Accumulate::AccumulateCursor>(*this, db);
}
std::vector<Symbol> Accumulate::ModifiedSymbols(const SymbolTable &) const {
return symbols_;
}
Accumulate::AccumulateCursor::AccumulateCursor(const Accumulate &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
@ -1895,6 +1984,12 @@ std::unique_ptr<Cursor> Aggregate::MakeCursor(
return std::make_unique<AggregateCursor>(*this, db);
}
std::vector<Symbol> Aggregate::ModifiedSymbols(const SymbolTable &) const {
auto symbols = remember_;
for (const auto &elem : aggregations_) symbols.push_back(elem.output_sym);
return symbols;
}
Aggregate::AggregateCursor::AggregateCursor(const Aggregate &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
@ -2172,6 +2267,10 @@ std::vector<Symbol> Skip::OutputSymbols(const SymbolTable &symbol_table) const {
return input_->OutputSymbols(symbol_table);
}
std::vector<Symbol> Skip::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
Skip::SkipCursor::SkipCursor(const Skip &self, database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
@ -2220,6 +2319,10 @@ std::vector<Symbol> Limit::OutputSymbols(
return input_->OutputSymbols(symbol_table);
}
std::vector<Symbol> Limit::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
Limit::LimitCursor::LimitCursor(const Limit &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
@ -2282,6 +2385,10 @@ std::vector<Symbol> OrderBy::OutputSymbols(
return input_->OutputSymbols(symbol_table);
}
std::vector<Symbol> OrderBy::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
OrderBy::OrderByCursor::OrderByCursor(const OrderBy &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
@ -2422,6 +2529,15 @@ std::unique_ptr<Cursor> Merge::MakeCursor(database::GraphDbAccessor &db) const {
return std::make_unique<MergeCursor>(*this, db);
}
std::vector<Symbol> Merge::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
// Match and create branches should have the same symbols, so just take one of
// them.
auto my_symbols = merge_match_->OutputSymbols(table);
symbols.insert(symbols.end(), my_symbols.begin(), my_symbols.end());
return symbols;
}
Merge::MergeCursor::MergeCursor(const Merge &self,
database::GraphDbAccessor &db)
: input_cursor_(self.input_->MakeCursor(db)),
@ -2490,6 +2606,13 @@ std::unique_ptr<Cursor> Optional::MakeCursor(
return std::make_unique<OptionalCursor>(*this, db);
}
std::vector<Symbol> Optional::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
auto my_symbols = optional_->ModifiedSymbols(table);
symbols.insert(symbols.end(), my_symbols.begin(), my_symbols.end());
return symbols;
}
Optional::OptionalCursor::OptionalCursor(const Optional &self,
database::GraphDbAccessor &db)
: self_(self),
@ -2550,6 +2673,12 @@ std::unique_ptr<Cursor> Unwind::MakeCursor(
return std::make_unique<UnwindCursor>(*this, db);
}
std::vector<Symbol> Unwind::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(output_symbol_);
return symbols;
}
Unwind::UnwindCursor::UnwindCursor(const Unwind &self,
database::GraphDbAccessor &db)
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
@ -2603,6 +2732,10 @@ std::vector<Symbol> Distinct::OutputSymbols(
return input_->OutputSymbols(symbol_table);
}
std::vector<Symbol> Distinct::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
Distinct::DistinctCursor::DistinctCursor(const Distinct &self,
database::GraphDbAccessor &db)
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
@ -2691,6 +2824,10 @@ std::vector<Symbol> Union::OutputSymbols(const SymbolTable &) const {
return union_symbols_;
}
std::vector<Symbol> Union::ModifiedSymbols(const SymbolTable &) const {
return union_symbols_;
}
Union::UnionCursor::UnionCursor(const Union &self,
database::GraphDbAccessor &db)
: self_(self),
@ -2731,6 +2868,20 @@ PullRemote::PullRemote(const std::shared_ptr<LogicalOperator> &input,
ACCEPT_WITH_INPUT(PullRemote);
std::vector<Symbol> PullRemote::OutputSymbols(const SymbolTable &table) const {
return input_ ? input_->OutputSymbols(table) : std::vector<Symbol>{};
}
std::vector<Symbol> PullRemote::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = symbols_;
if (input_) {
auto input_symbols = input_->ModifiedSymbols(table);
symbols.insert(symbols.end(), input_symbols.begin(), input_symbols.end());
}
return symbols;
}
PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self,
database::GraphDbAccessor &db)
: self_(self),
@ -2871,6 +3022,16 @@ bool Synchronize::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
return visitor.PostVisit(*this);
}
std::vector<Symbol> Synchronize::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
if (pull_remote_) {
auto pull_symbols = pull_remote_->ModifiedSymbols(table);
symbols.insert(symbols.end(), pull_symbols.begin(), pull_symbols.end());
}
return symbols;
}
namespace {
class SynchronizeCursor : public Cursor {
public:
@ -3100,6 +3261,13 @@ std::unique_ptr<Cursor> Cartesian::MakeCursor(
return std::make_unique<CartesianCursor>(*this, db);
}
std::vector<Symbol> Cartesian::ModifiedSymbols(const SymbolTable &table) const {
auto symbols = left_op_->ModifiedSymbols(table);
auto right = right_op_->ModifiedSymbols(table);
symbols.insert(symbols.end(), right.begin(), right.end());
return symbols;
}
} // namespace query::plan
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Once);

View File

@ -129,7 +129,7 @@ class HierarchicalLogicalOperatorVisitor
using typename LogicalOperatorLeafVisitor::ReturnType;
};
/** @brief Base class for logical operators.
/** Base class for logical operators.
*
* Each operator describes an operation, which is to be performed on the
* database. Operators are iterated over using a @c Cursor. Various operators
@ -138,7 +138,9 @@ class HierarchicalLogicalOperatorVisitor
class LogicalOperator
: public ::utils::Visitable<HierarchicalLogicalOperatorVisitor> {
public:
/** @brief Constructs a @c Cursor which is used to run this operator.
virtual ~LogicalOperator() {}
/** Constructs a @c Cursor which is used to run this operator.
*
* @param database::GraphDbAccessor Used to perform operations on the
* database.
@ -146,13 +148,11 @@ class LogicalOperator
virtual std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const = 0;
/** @brief Return @c Symbol vector where the results will be stored.
/** Return @c Symbol vector where the query results will be stored.
*
* Currently, outputs symbols are generated in @c Produce and @c Union
* operators. @c Skip, @c Limit, @c OrderBy and @c Distinct propagate the
* symbols from @c Produce (if it exists as input operator). In the future, we
* may want this method to return the symbols that will be set in this
* operator.
* symbols from @c Produce (if it exists as input operator).
*
* @param SymbolTable used to find symbols for expressions.
* @return std::vector<Symbol> used for results.
@ -161,7 +161,23 @@ class LogicalOperator
return std::vector<Symbol>();
}
virtual ~LogicalOperator() {}
/**
* Symbol vector whose values are modified by this operator sub-tree.
*
* This is different than @c OutputSymbols, because it returns all of the
* modified symbols, including those that may not be returned as the
* result of the query. Note that the modified symbols will not contain
* those that should not be read after the operator is processed.
*
* For example, `MATCH (n)-[e]-(m) RETURN n AS l` will generate `ScanAll (n) >
* Expand (e, m) > Produce (l)`. The modified symbols on Produce sub-tree will
* be `l`, the same as output symbols, because it isn't valid to read `n`, `e`
* nor `m` after Produce. On the other hand, modified symbols from Expand
* contain `e` and `m`, as well as `n`, while output symbols are empty.
* Modified symbols from ScanAll contain only `n`, while output symbols are
* also empty.
*/
virtual std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const = 0;
private:
friend class boost::serialization::access;
@ -188,6 +204,9 @@ class Once : public LogicalOperator {
DEFVISITABLE(HierarchicalLogicalOperatorVisitor);
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override {
return {};
}
private:
class OnceCursor : public Cursor {
@ -230,6 +249,10 @@ class CreateNode : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
CreateNode() {}
@ -306,6 +329,10 @@ class CreateExpand : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
// info on what's getting expanded
@ -402,8 +429,10 @@ class ScanAll : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
auto output_symbol() const { return output_symbol_; }
auto graph_view() const { return graph_view_; }
@ -752,6 +781,7 @@ class Expand : public LogicalOperator, public ExpandCommon {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
class ExpandCursor : public Cursor {
public:
@ -873,6 +903,7 @@ class ExpandVariable : public LogicalOperator, public ExpandCommon {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto type() const { return type_; }
@ -937,6 +968,7 @@ class ConstructNamedPath : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
const auto &input() const { return input_; }
const auto &path_symbol() const { return path_symbol_; }
@ -975,6 +1007,7 @@ class Filter : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
private:
std::shared_ptr<LogicalOperator> input_;
@ -1032,6 +1065,8 @@ class Produce : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
const std::vector<NamedExpression *> &named_expressions();
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
@ -1085,6 +1120,10 @@ class Delete : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
std::shared_ptr<LogicalOperator> input_;
@ -1142,6 +1181,10 @@ class SetProperty : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
std::shared_ptr<LogicalOperator> input_;
@ -1212,6 +1255,10 @@ class SetProperties : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
std::shared_ptr<LogicalOperator> input_;
@ -1278,6 +1325,10 @@ class SetLabels : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
std::shared_ptr<LogicalOperator> input_;
@ -1319,6 +1370,10 @@ class RemoveProperty : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
std::shared_ptr<LogicalOperator> input_;
@ -1371,6 +1426,10 @@ class RemoveLabels : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
private:
std::shared_ptr<LogicalOperator> input_;
@ -1433,6 +1492,7 @@ class ExpandUniquenessFilter : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
private:
std::shared_ptr<LogicalOperator> input_;
@ -1498,6 +1558,7 @@ class Accumulate : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
const auto &symbols() const { return symbols_; };
@ -1600,6 +1661,7 @@ class Aggregate : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
const auto &aggregations() const { return aggregations_; }
const auto &group_by() const { return group_by_; }
@ -1736,6 +1798,7 @@ class Skip : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
@ -1803,6 +1866,7 @@ class Limit : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
@ -1868,6 +1932,7 @@ class OrderBy : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
const auto &output_symbols() const { return output_symbols_; }
auto input() const { return input_; }
@ -1976,6 +2041,7 @@ class Merge : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
auto merge_match() const { return merge_match_; }
@ -2034,6 +2100,7 @@ class Optional : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
auto optional() const { return optional_; }
@ -2088,6 +2155,7 @@ class Unwind : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
Expression *input_expression() const { return input_expression_; }
@ -2151,6 +2219,7 @@ class Distinct : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
@ -2204,6 +2273,9 @@ class CreateIndex : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override {
return {};
}
auto label() const { return label_; }
auto property() const { return property_; }
@ -2242,6 +2314,7 @@ class Union : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
private:
std::shared_ptr<LogicalOperator> left_op_, right_op_;
@ -2278,7 +2351,7 @@ class Union : public LogicalOperator {
* other workers) frames. Obtaining remote frames is done through RPC calls to
* `distributed::RemoteProduceRpcServer`s running on all the workers.
*
* This operator aims to yield results as fast as possible and loose minimal
* This operator aims to yield results as fast as possible and lose minimal
* time on data transfer. It gives no guarantees on result order.
*/
class PullRemote : public LogicalOperator {
@ -2288,10 +2361,9 @@ class PullRemote : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override {
return symbols_;
}
const auto &symbols() const { return symbols_; }
auto plan_id() const { return plan_id_; }
@ -2348,7 +2420,7 @@ class PullRemote : public LogicalOperator {
* 5. Tell all the workers to apply their updates. This is async.
* 6. Apply local updates, in parallel with 5. on the workers.
* 7. Notify workers that the command has advanced, if necessary.
* 8. Yield all the resutls, first local, then from RemotePull if available.
* 8. Yield all the results, first local, then from RemotePull if available.
*/
class Synchronize : public LogicalOperator {
public:
@ -2361,6 +2433,7 @@ class Synchronize : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
std::vector<Symbol> OutputSymbols(
const SymbolTable &symbol_table) const override {
@ -2404,6 +2477,7 @@ class Cartesian : public LogicalOperator {
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
auto left_op() const { return left_op_; }
auto left_symbols() const { return left_symbols_; }

View File

@ -504,13 +504,13 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor {
bool PreVisit(query::plan::PullRemote &op) override {
WithPrintLn([&op](auto &out) {
out << "* PullRemote {";
out << "* PullRemote [" << op.plan_id() << "] {";
utils::PrintIterable(
out, op.symbols(), ", ",
[](auto &out, const auto &sym) { out << sym.name(); });
out << "}";
});
WithPrintLn([](auto &out) { out << " \\"; });
WithPrintLn([](auto &out) { out << "|\\"; });
++depth_;
WithPrintLn([](auto &out) { out << "* workers"; });
--depth_;
@ -526,6 +526,13 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor {
op.input()->Accept(*this);
return false;
}
bool PreVisit(query::plan::Cartesian &op) override {
WithPrintLn([](auto &out) { out << "* Cartesian"; });
Branch(*op.right_op());
op.left_op()->Accept(*this);
return false;
}
#undef PRE_VISIT
private:
@ -533,7 +540,10 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor {
// corresponding to the current depth_.
template <class TFun>
void WithPrintLn(TFun fun) {
std::cout << std::string(depth_ * 2, ' ');
std::cout << " ";
for (int i = 0; i < depth_; ++i) {
std::cout << "| ";
}
fun(std::cout);
std::cout << std::endl;
}
@ -542,7 +552,7 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor {
// and printing the branch name.
void Branch(query::plan::LogicalOperator &op,
const std::string &branch_name = "") {
WithPrintLn([&](auto &out) { out << " \\ " << branch_name; });
WithPrintLn([&](auto &out) { out << "|\\ " << branch_name; });
++depth_;
op.Accept(*this);
--depth_;
@ -625,10 +635,13 @@ DEFCOMMAND(ShowDistributed) {
distributed_plan.master_plan->Accept(printer);
std::cout << std::endl;
}
if (distributed_plan.worker_plan) {
std::cout << "---- Worker Plan ---- " << std::endl;
for (size_t i = 0; i < distributed_plan.worker_plans.size(); ++i) {
int64_t id;
std::shared_ptr<query::plan::LogicalOperator> worker_plan;
std::tie(id, worker_plan) = distributed_plan.worker_plans[i];
std::cout << "---- Worker Plan #" << id << " ---- " << std::endl;
PlanPrinter printer(dba);
distributed_plan.worker_plan->Accept(printer);
worker_plan->Accept(printer);
std::cout << std::endl;
}
}

View File

@ -120,6 +120,11 @@ class PlanChecker : public HierarchicalLogicalOperatorVisitor {
op.input()->Accept(*this);
return false;
}
bool PreVisit(Cartesian &op) override {
CheckOp(op);
return false;
}
#undef PRE_VISIT
std::list<BaseOpChecker *> checkers_;
@ -408,6 +413,26 @@ class ExpectSynchronize : public OpChecker<Synchronize> {
bool advance_command_ = false;
};
class ExpectCartesian : public OpChecker<Cartesian> {
public:
ExpectCartesian(const std::list<std::unique_ptr<BaseOpChecker>> &left,
const std::list<std::unique_ptr<BaseOpChecker>> &right)
: left_(left), right_(right) {}
void ExpectOp(Cartesian &op, const SymbolTable &symbol_table) override {
ASSERT_TRUE(op.left_op());
PlanChecker left_checker(left_, symbol_table);
op.left_op()->Accept(left_checker);
ASSERT_TRUE(op.right_op());
PlanChecker right_checker(right_, symbol_table);
op.right_op()->Accept(right_checker);
}
private:
const std::list<std::unique_ptr<BaseOpChecker>> &left_;
const std::list<std::unique_ptr<BaseOpChecker>> &right_;
};
auto MakeSymbolTable(query::Query &query) {
SymbolTable symbol_table;
SymbolGenerator symbol_generator(symbol_table);
@ -482,7 +507,7 @@ auto CheckPlan(AstTreeStorage &storage, TChecker... checker) {
struct ExpectedDistributedPlan {
std::list<std::unique_ptr<BaseOpChecker>> master_checkers;
std::list<std::unique_ptr<BaseOpChecker>> worker_checkers;
std::vector<std::list<std::unique_ptr<BaseOpChecker>>> worker_checkers;
};
template <class TPlanner>
@ -501,13 +526,17 @@ void CheckDistributedPlan(DistributedPlan &distributed_plan,
distributed_plan.master_plan->Accept(plan_checker);
EXPECT_TRUE(plan_checker.checkers_.empty());
if (expected.worker_checkers.empty()) {
EXPECT_FALSE(distributed_plan.worker_plan);
EXPECT_TRUE(distributed_plan.worker_plans.empty());
} else {
ASSERT_TRUE(distributed_plan.worker_plan);
PlanChecker plan_checker(expected.worker_checkers,
distributed_plan.symbol_table);
distributed_plan.worker_plan->Accept(plan_checker);
EXPECT_TRUE(plan_checker.checkers_.empty());
ASSERT_EQ(distributed_plan.worker_plans.size(),
expected.worker_checkers.size());
for (size_t i = 0; i < expected.worker_checkers.size(); ++i) {
PlanChecker plan_checker(expected.worker_checkers[i],
distributed_plan.symbol_table);
auto worker_plan = distributed_plan.worker_plans[i].second;
worker_plan->Accept(plan_checker);
EXPECT_TRUE(plan_checker.checkers_.empty());
}
}
}
@ -516,6 +545,7 @@ void CheckDistributedPlan(const LogicalOperator &plan,
ExpectedDistributedPlan &expected_distributed_plan) {
std::atomic<int64_t> next_plan_id{0};
auto distributed_plan = MakeDistributedPlan(plan, symbol_table, next_plan_id);
EXPECT_EQ(next_plan_id - 1, distributed_plan.worker_plans.size());
CheckDistributedPlan(distributed_plan, expected_distributed_plan);
}
@ -540,6 +570,43 @@ std::list<std::unique_ptr<BaseOpChecker>> MakeCheckers(T arg, Rest &&... rest) {
return std::move(l);
}
ExpectedDistributedPlan ExpectDistributed(
std::list<std::unique_ptr<BaseOpChecker>> master_checker) {
return ExpectedDistributedPlan{std::move(master_checker)};
}
ExpectedDistributedPlan ExpectDistributed(
std::list<std::unique_ptr<BaseOpChecker>> master_checker,
std::list<std::unique_ptr<BaseOpChecker>> worker_checker) {
ExpectedDistributedPlan expected{std::move(master_checker)};
expected.worker_checkers.emplace_back(std::move(worker_checker));
return expected;
}
void AddWorkerCheckers(
ExpectedDistributedPlan &expected,
std::list<std::unique_ptr<BaseOpChecker>> worker_checker) {
expected.worker_checkers.emplace_back(std::move(worker_checker));
}
template <class... Rest>
void AddWorkerCheckers(ExpectedDistributedPlan &expected,
std::list<std::unique_ptr<BaseOpChecker>> worker_checker,
Rest &&... rest) {
expected.worker_checkers.emplace_back(std::move(worker_checker));
AddWorkerCheckers(expected, std::forward<Rest>(rest)...);
}
template <class... Rest>
ExpectedDistributedPlan ExpectDistributed(
std::list<std::unique_ptr<BaseOpChecker>> master_checker,
std::list<std::unique_ptr<BaseOpChecker>> worker_checker, Rest &&... rest) {
ExpectedDistributedPlan expected{std::move(master_checker)};
expected.worker_checkers.emplace_back(std::move(worker_checker));
AddWorkerCheckers(expected, std::forward<Rest>(rest)...);
return expected;
}
template <class T>
class TestPlanner : public ::testing::Test {};
@ -557,9 +624,9 @@ TYPED_TEST(TestPlanner, MatchNodeReturn) {
auto planner = MakePlanner<TypeParam>(db, storage, symbol_table);
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectProduce())};
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -576,10 +643,8 @@ TYPED_TEST(TestPlanner, CreateNodeReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc,
ExpectProduce());
{
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectSynchronize(false),
ExpectProduce()),
{}};
auto expected = ExpectDistributed(MakeCheckers(
ExpectCreateNode(), ExpectSynchronize(false), ExpectProduce()));
std::atomic<int64_t> next_plan_id{0};
auto distributed_plan =
MakeDistributedPlan(planner.plan(), symbol_table, next_plan_id);
@ -661,9 +726,9 @@ TYPED_TEST(TestPlanner, MatchCreateExpand) {
CREATE(PATTERN(NODE("n"), EDGE("r", Direction::OUT, {relationship}),
NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectCreateExpand());
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectCreateExpand(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectCreateExpand())};
MakeCheckers(ExpectScanAll(), ExpectCreateExpand()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -680,9 +745,9 @@ TYPED_TEST(TestPlanner, MatchLabeledNodes) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabel(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAllByLabel(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAllByLabel(), ExpectProduce())};
MakeCheckers(ExpectScanAllByLabel(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -702,9 +767,9 @@ TYPED_TEST(TestPlanner, MatchPathReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce())};
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -725,11 +790,11 @@ TYPED_TEST(TestPlanner, MatchNamedPatternReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectConstructNamedPath(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_p)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectProduce())};
ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -750,11 +815,11 @@ TYPED_TEST(TestPlanner, MatchNamedPatternWithPredicateReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectConstructNamedPath(), ExpectFilter(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_p)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectFilter(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectFilter(), ExpectProduce())};
ExpectFilter(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -779,11 +844,11 @@ TYPED_TEST(TestPlanner, OptionalMatchNamedPatternReturn) {
auto planner = MakePlanner<TypeParam>(db, storage, symbol_table);
CheckPlan(planner.plan(), symbol_table,
ExpectOptional(optional_symbols, optional), ExpectProduce());
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectOptional(optional_symbols, optional), ExpectProduce(),
ExpectPullRemote({symbol_table.at(*as_p)})),
MakeCheckers(ExpectOptional(optional_symbols, optional),
ExpectProduce())};
ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -802,9 +867,9 @@ TYPED_TEST(TestPlanner, MatchWhereReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectFilter(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectProduce())};
MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -813,9 +878,9 @@ TYPED_TEST(TestPlanner, MatchDelete) {
AstTreeStorage storage;
QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), DELETE(IDENT("n"))));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectDelete());
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectDelete(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectDelete())};
MakeCheckers(ExpectScanAll(), ExpectDelete()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -831,11 +896,11 @@ TYPED_TEST(TestPlanner, MatchNodeSet) {
SET("n", IDENT("n")), SET("n", {label})));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectSetProperty(),
ExpectSetProperties(), ExpectSetLabels());
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectSetProperty(), ExpectSetProperties(),
ExpectSetLabels(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectSetProperty(), ExpectSetProperties(),
ExpectSetLabels())};
ExpectSetLabels()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -850,11 +915,11 @@ TYPED_TEST(TestPlanner, MatchRemove) {
REMOVE(PROPERTY_LOOKUP("n", prop)), REMOVE("n", {label})));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectRemoveProperty(),
ExpectRemoveLabels());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(),
ExpectRemoveLabels(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(),
ExpectRemoveLabels())};
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(),
ExpectRemoveLabels(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(),
ExpectRemoveLabels()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -900,16 +965,44 @@ TYPED_TEST(TestPlanner, MatchMultiPatternSameExpandStart) {
TYPED_TEST(TestPlanner, MultiMatch) {
// Test MATCH (n) -[r]- (m) MATCH (j) -[e]- (i) -[f]- (h) RETURN n
AstTreeStorage storage;
QUERY(SINGLE_QUERY(
MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m"))),
MATCH(PATTERN(NODE("j"), EDGE("e"), NODE("i"), EDGE("f"), NODE("h"))),
RETURN("n")));
auto *node_n = NODE("n");
auto *edge_r = EDGE("r");
auto *node_m = NODE("m");
auto *node_j = NODE("j");
auto *edge_e = EDGE("e");
auto *node_i = NODE("i");
auto *edge_f = EDGE("f");
auto *node_h = NODE("h");
QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n, edge_r, node_m)),
MATCH(PATTERN(node_j, edge_e, node_i, edge_f, node_h)),
RETURN("n")));
auto symbol_table = MakeSymbolTable(*storage.query());
database::SingleNode db;
auto planner = MakePlanner<TypeParam>(db, storage, symbol_table);
// Multiple MATCH clauses form a Cartesian product, so the uniqueness should
// not cross MATCH boundaries.
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectExpand(),
ExpectScanAll(), ExpectExpand(), ExpectExpand(),
ExpectExpandUniquenessFilter<EdgeAccessor>(),
ExpectProduce());
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectScanAll(), ExpectExpand(), ExpectExpand(),
ExpectExpandUniquenessFilter<EdgeAccessor>(), ExpectProduce());
auto get_symbol = [&symbol_table](const auto *atom_node) {
return symbol_table.at(*atom_node->identifier_);
};
ExpectPullRemote left_pull(
{get_symbol(node_n), get_symbol(edge_r), get_symbol(node_m)});
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectExpand(), left_pull);
ExpectPullRemote right_pull({get_symbol(node_j), get_symbol(edge_e),
get_symbol(node_i), get_symbol(edge_f),
get_symbol(node_h)});
auto right_cart =
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectExpand(),
ExpectExpandUniquenessFilter<EdgeAccessor>(), right_pull);
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
ExpectProduce()),
MakeCheckers(ExpectScanAll(), ExpectExpand()),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectExpand(),
ExpectExpandUniquenessFilter<EdgeAccessor>()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TYPED_TEST(TestPlanner, MultiMatchSameStart) {
@ -927,9 +1020,9 @@ TYPED_TEST(TestPlanner, MultiMatchSameStart) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce())};
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -946,9 +1039,9 @@ TYPED_TEST(TestPlanner, MatchWithReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_new)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectProduce())};
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -968,11 +1061,11 @@ TYPED_TEST(TestPlanner, MatchWithWhereReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(),
ExpectFilter(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_new)});
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectFilter(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectFilter(),
ExpectProduce())};
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectProduce(),
ExpectFilter(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectProduce(),
ExpectFilter(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1036,10 +1129,10 @@ TYPED_TEST(TestPlanner, MatchReturnSum) {
auto master_aggr = ExpectMasterAggregate({merge_sum}, {n_prop2});
ExpectPullRemote pull(
{symbol_table.at(*sum), symbol_table.at(*n_prop2->expression_)});
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), aggr, pull, master_aggr, ExpectProduce(),
ExpectProduce()),
MakeCheckers(ExpectScanAll(), aggr)};
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), aggr, pull, master_aggr,
ExpectProduce(), ExpectProduce()),
MakeCheckers(ExpectScanAll(), aggr));
CheckDistributedPlan(distributed_plan, expected);
}
}
@ -1076,10 +1169,10 @@ TYPED_TEST(TestPlanner, MatchWithCreate) {
PATTERN(NODE("a"), EDGE("r", Direction::OUT, {r_type}), NODE("b")))));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectProduce(),
ExpectCreateExpand());
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectCreateExpand(),
ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectCreateExpand())};
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectCreateExpand()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1095,10 +1188,10 @@ TYPED_TEST(TestPlanner, MatchReturnSkipLimit) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(),
ExpectSkip(), ExpectLimit());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectProduce(), pull, ExpectSkip(),
ExpectLimit()),
MakeCheckers(ExpectScanAll(), ExpectProduce())};
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectProduce(), pull,
ExpectSkip(), ExpectLimit()),
MakeCheckers(ExpectScanAll(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1121,9 +1214,8 @@ TYPED_TEST(TestPlanner, CreateWithSkipReturnLimit) {
CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc,
ExpectProduce(), ExpectSkip(), ExpectProduce(), ExpectLimit());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectSynchronize(true),
ExpectProduce(), ExpectSkip(), ExpectProduce(),
ExpectLimit()),
MakeCheckers(ExpectCreateNode(), ExpectSynchronize(true), ExpectProduce(),
ExpectSkip(), ExpectProduce(), ExpectLimit()),
{}};
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1161,9 +1253,9 @@ TYPED_TEST(TestPlanner, MatchReturnOrderBy) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(),
ExpectOrderBy());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectProduce(), pull, ExpectOrderBy()),
MakeCheckers(ExpectScanAll(), ExpectProduce())};
MakeCheckers(ExpectScanAll(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1270,9 +1362,9 @@ TYPED_TEST(TestPlanner, MatchUnwindReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectUnwind(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n), symbol_table.at(*as_x)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectUnwind(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectUnwind(), ExpectProduce())};
MakeCheckers(ExpectScanAll(), ExpectUnwind(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1283,10 +1375,9 @@ TYPED_TEST(TestPlanner, ReturnDistinctOrderBySkipLimit) {
SKIP(LITERAL(1)), LIMIT(LITERAL(1)))));
CheckPlan<TypeParam>(storage, ExpectProduce(), ExpectDistinct(),
ExpectOrderBy(), ExpectSkip(), ExpectLimit());
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectProduce(), ExpectDistinct(), ExpectOrderBy(),
ExpectSkip(), ExpectLimit()),
{}};
ExpectSkip(), ExpectLimit()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1344,11 +1435,11 @@ TYPED_TEST(TestPlanner, MatchWhereBeforeExpand) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectFilter(),
ExpectExpand(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectExpand(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectExpand(),
ExpectProduce())};
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectFilter(),
ExpectExpand(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectFilter(),
ExpectExpand(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1474,7 +1565,7 @@ TYPED_TEST(TestPlanner, FunctionAggregationReturn) {
RETURN(FN("sqrt", sum), AS("result"), group_by_literal, AS("group_by"))));
auto aggr = ExpectAggregate({sum}, {group_by_literal});
CheckPlan<TypeParam>(storage, aggr, ExpectProduce());
ExpectedDistributedPlan expected{MakeCheckers(aggr, ExpectProduce()), {}};
auto expected = ExpectDistributed(MakeCheckers(aggr, ExpectProduce()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1483,7 +1574,7 @@ TYPED_TEST(TestPlanner, FunctionWithoutArguments) {
AstTreeStorage storage;
QUERY(SINGLE_QUERY(RETURN(FN("pi"), AS("pi"))));
CheckPlan<TypeParam>(storage, ExpectProduce());
ExpectedDistributedPlan expected{MakeCheckers(ExpectProduce()), {}};
auto expected = ExpectDistributed(MakeCheckers(ExpectProduce()));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1585,8 +1676,8 @@ TYPED_TEST(TestPlanner, CreateIndex) {
AstTreeStorage storage;
QUERY(SINGLE_QUERY(CREATE_INDEX_ON(label, property)));
CheckPlan<TypeParam>(storage, ExpectCreateIndex(label, property));
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateIndex(label, property)), {}};
auto expected =
ExpectDistributed(MakeCheckers(ExpectCreateIndex(label, property)));
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1736,8 +1827,9 @@ TYPED_TEST(TestPlanner, WhereIndexedLabelPropertyRange) {
AstTreeStorage storage;
auto lit_42 = LITERAL(42);
auto n_prop = PROPERTY_LOOKUP("n", property);
auto check_planned_range = [&label, &property, &db](
const auto &rel_expr, auto lower_bound, auto upper_bound) {
auto check_planned_range = [&label, &property, &db](const auto &rel_expr,
auto lower_bound,
auto upper_bound) {
// Shadow the first storage, so that the query is created in this one.
AstTreeStorage storage;
QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", label))), WHERE(rel_expr),
@ -2013,9 +2105,9 @@ TYPED_TEST(TestPlanner, DistributedAvg) {
auto worker_sum = SUM(PROPERTY_LOOKUP("n", prop));
auto worker_count = COUNT(PROPERTY_LOOKUP("n", prop));
{
ASSERT_TRUE(distributed_plan.worker_plan);
auto worker_aggr_op =
std::dynamic_pointer_cast<Aggregate>(distributed_plan.worker_plan);
ASSERT_EQ(distributed_plan.worker_plans.size(), 1U);
auto worker_plan = distributed_plan.worker_plans.back().second;
auto worker_aggr_op = std::dynamic_pointer_cast<Aggregate>(worker_plan);
ASSERT_TRUE(worker_aggr_op);
ASSERT_EQ(worker_aggr_op->aggregations().size(), 2U);
symbol_table[*worker_sum] = worker_aggr_op->aggregations()[0].output_sym;
@ -2027,10 +2119,10 @@ TYPED_TEST(TestPlanner, DistributedAvg) {
auto master_aggr = ExpectMasterAggregate({merge_sum, merge_count}, {});
ExpectPullRemote pull(
{symbol_table.at(*worker_sum), symbol_table.at(*worker_count)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), worker_aggr, pull, master_aggr,
ExpectProduce(), ExpectProduce()),
MakeCheckers(ExpectScanAll(), worker_aggr)};
MakeCheckers(ExpectScanAll(), worker_aggr));
CheckDistributedPlan(distributed_plan, expected);
}
@ -2047,9 +2139,9 @@ TYPED_TEST(TestPlanner, DistributedCollectList) {
auto &symbol_table = distributed_plan.symbol_table;
auto aggr = ExpectAggregate({collect}, {});
ExpectPullRemote pull({symbol_table.at(*node_n->identifier_)});
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), pull, aggr, ExpectProduce()),
MakeCheckers(ExpectScanAll())};
MakeCheckers(ExpectScanAll()));
CheckDistributedPlan(distributed_plan, expected);
}
@ -2063,11 +2155,11 @@ TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) {
auto acc = ExpectAccumulate({symbol_table.at(*ident_m)});
database::Master db;
auto planner = MakePlanner<TypeParam>(db, storage, symbol_table);
ExpectedDistributedPlan expected{
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectCreateNode(),
ExpectSynchronize({symbol_table.at(*ident_m)}),
ExpectProduce()),
MakeCheckers(ExpectScanAll(), ExpectCreateNode())};
MakeCheckers(ExpectScanAll(), ExpectCreateNode()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
} // namespace