Handle indexed ScanAll in distributed Cartesian

Reviewers: mtomic, msantl, buda

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1435
This commit is contained in:
Teon Banek 2018-06-29 10:00:06 +02:00
parent 8fb6f3b5ce
commit 843aa4f92a
3 changed files with 560 additions and 130 deletions

View File

@ -40,14 +40,33 @@ int64_t AddWorkerPlan(DistributedPlan &distributed_plan,
return plan_id;
}
struct Branch {
// Independent subtree that can become a Cartesian branch
std::shared_ptr<LogicalOperator> subtree;
// Optional parent of the subtree which needs to come after the Cartesian
// operator, because it depends on some other Cartesian branch.
// parent_start is shared_ptr, because the ownership will be transfered to
// some other operator.
std::shared_ptr<LogicalOperator> parent_start;
// parent_end is pointer, because we may only change its input.
LogicalOperator *parent_end{nullptr};
// Minimum index of the branch this parent depends on.
std::experimental::optional<int64_t> depends_on;
};
// Find the subtree parent, below which no operator uses symbols found in the
// `forbidden_symbols` set.
// `forbidden_symbols` set. The operator tree may be modified in cases when an
// indexed lookup is split to regular lookup + filtering.
// TODO: After indexed lookup is moved to another stage, then this class should
// never visit such lookups or modify the tree.
class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
public:
IndependentSubtreeFinder(
const std::vector<std::vector<Symbol>> &forbidden_symbols,
const SymbolTable *symbol_table)
: forbidden_symbols_(forbidden_symbols), symbol_table_(symbol_table) {}
SymbolTable *symbol_table, AstStorage *storage)
: forbidden_symbols_(forbidden_symbols),
symbol_table_(symbol_table),
storage_(storage) {}
using HierarchicalLogicalOperatorVisitor::PostVisit;
using HierarchicalLogicalOperatorVisitor::PreVisit;
@ -59,54 +78,179 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
bool Visit(ModifyUser &) override { return true; }
bool Visit(DropUser &) override { return true; }
bool PostVisit(ScanAll &scan) override { return true; }
bool PostVisit(ScanAllByLabel &scan) override { return true; }
bool PreVisit(ScanAll &scan) override {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAll &scan) override {
prev_ops_.pop_back();
return true;
}
bool PreVisit(ScanAllByLabel &scan) override {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAllByLabel &scan) override {
prev_ops_.pop_back();
return true;
}
bool PreVisit(ScanAllByLabelPropertyRange &scan) override {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAllByLabelPropertyRange &scan) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
auto find_forbidden =
[this](auto maybe_bound) -> std::experimental::optional<int64_t> {
if (!maybe_bound) return std::experimental::nullopt;
UsedSymbolsCollector collector(*symbol_table_);
maybe_bound->value()->Accept(collector);
return this->ContainsForbidden(collector.symbols_);
};
CHECK(scan.lower_bound() || scan.upper_bound());
auto lower_depends = find_forbidden(scan.lower_bound());
auto upper_depends = find_forbidden(scan.upper_bound());
if (!lower_depends && !upper_depends) return true;
// Since we have dependencies, we need to extract them as a Filter. There
// are two cases:
// 1) Only one bound depends on forbidden symbols
// a) If the other bound exists, extract to
// ScanAllByLabelPropertyRange(only other bound) + Filter
// b) If there is no other bound, extract to ScanAllByLabel + Filter.
// 2) Both bounds depend on forbidden symbols
// a) Extract to ScanAllByLabel + Filter x2
auto make_prop_lookup = [&]() {
auto ident = storage_->Create<Identifier>(
scan.output_symbol().name(), scan.output_symbol().user_declared());
(*symbol_table_)[*ident] = scan.output_symbol();
return storage_->Create<PropertyLookup>(ident, "", scan.property());
};
Expression *extracted_filter = nullptr;
std::shared_ptr<ScanAll> new_scan;
if (lower_depends) {
// Extract the filtering expression
auto prop_lookup = make_prop_lookup();
if (scan.lower_bound()->IsInclusive()) {
extracted_filter = storage_->Create<GreaterEqualOperator>(
prop_lookup, scan.lower_bound()->value());
} else {
extracted_filter = storage_->Create<GreaterOperator>(
prop_lookup, scan.lower_bound()->value());
}
// Choose new scan operation
branch_.depends_on = lower_depends;
if (upper_depends || !scan.upper_bound()) {
// Cases 2) and 1.b)
new_scan =
std::make_shared<ScanAllByLabel>(scan.input(), scan.output_symbol(),
scan.label(), scan.graph_view());
} else {
// Case 1.a)
new_scan = std::make_shared<ScanAllByLabelPropertyRange>(
scan.input(), scan.output_symbol(), scan.label(), scan.property(),
std::experimental::nullopt, scan.upper_bound(), scan.graph_view());
}
}
if (upper_depends) {
Expression *filter;
auto prop_lookup = make_prop_lookup();
if (scan.upper_bound()->IsInclusive()) {
filter = storage_->Create<LessEqualOperator>(
prop_lookup, scan.upper_bound()->value());
} else {
filter = storage_->Create<LessOperator>(prop_lookup,
scan.upper_bound()->value());
}
if (lower_depends) {
CHECK(extracted_filter);
CHECK(new_scan);
CHECK(branch_.depends_on);
branch_.depends_on = std::min(*branch_.depends_on, *upper_depends);
extracted_filter =
storage_->Create<AndOperator>(extracted_filter, filter);
} else {
CHECK(!extracted_filter);
CHECK(!branch_.depends_on);
branch_.depends_on = upper_depends;
extracted_filter = filter;
if (scan.lower_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
scan.lower_bound()->value()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
// Case 1.a)
new_scan = std::make_shared<ScanAllByLabelPropertyRange>(
scan.input(), scan.output_symbol(), scan.label(), scan.property(),
scan.lower_bound(), std::experimental::nullopt,
scan.graph_view());
} else {
// Case 1.b)
new_scan = std::make_shared<ScanAllByLabel>(
scan.input(), scan.output_symbol(), scan.label(),
scan.graph_view());
}
}
if (scan.upper_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
scan.upper_bound()->value()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
CHECK(extracted_filter);
CHECK(new_scan);
CHECK(branch_.depends_on);
branch_.subtree = new_scan;
auto parent = std::make_shared<Filter>(new_scan, extracted_filter);
branch_.parent_end = parent.get();
// Wire the remaining operators above us into the new parent.
if (prev_ops_.empty()) {
branch_.parent_start = parent;
} else {
prev_ops_.back()->set_input(parent);
}
return true;
}
bool PreVisit(ScanAllByLabelPropertyValue &scan) override {
prev_ops_.push_back(&scan);
return true;
}
bool PostVisit(ScanAllByLabelPropertyValue &scan) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
UsedSymbolsCollector collector(*symbol_table_);
scan.expression()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
if (auto found = ContainsForbidden(collector.symbols_)) {
// Split to ScanAllByLabel + Filter on property
auto subtree = std::make_shared<ScanAllByLabel>(
scan.input(), scan.output_symbol(), scan.label(), scan.graph_view());
auto ident = storage_->Create<Identifier>(
scan.output_symbol().name(), scan.output_symbol().user_declared());
(*symbol_table_)[*ident] = scan.output_symbol();
auto prop_lookup =
storage_->Create<PropertyLookup>(ident, "", scan.property());
auto prop_equal =
storage_->Create<EqualOperator>(prop_lookup, scan.expression());
auto parent = std::make_shared<Filter>(subtree, prop_equal);
SetBranch(subtree, parent.get(), *found);
if (prev_ops_.empty()) {
branch_.parent_start = parent;
} else {
prev_ops_.back()->set_input(parent);
}
}
return true;
}
bool PreVisit(Expand &exp) override {
prev_ops_.push_back(&exp);
return true;
}
bool PostVisit(Expand &exp) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
if (auto found = FindForbidden(exp.input_symbol())) {
subtree_ = exp.input();
parent_ = &exp;
depends_on_ = found;
SetBranch(exp.input(), &exp, *found);
}
if (exp.existing_node()) {
if (auto found = FindForbidden(exp.node_symbol())) {
subtree_ = exp.input();
parent_ = &exp;
if (depends_on_) {
depends_on_ = std::min(*found, *depends_on_);
} else {
depends_on_ = found;
}
SetBranch(exp.input(), &exp, *found);
}
}
CHECK(!FindForbidden(exp.edge_symbol()))
@ -114,22 +258,19 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(ExpandVariable &exp) override {
prev_ops_.push_back(&exp);
return true;
}
bool PostVisit(ExpandVariable &exp) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
if (auto found = FindForbidden(exp.input_symbol())) {
subtree_ = exp.input();
parent_ = &exp;
depends_on_ = found;
SetBranch(exp.input(), &exp, *found);
}
if (exp.existing_node()) {
if (auto found = FindForbidden(exp.node_symbol())) {
subtree_ = exp.input();
parent_ = &exp;
if (depends_on_) {
depends_on_ = std::min(*found, *depends_on_);
} else {
depends_on_ = found;
}
SetBranch(exp.input(), &exp, *found);
}
}
CHECK(!FindForbidden(exp.edge_symbol()))
@ -138,26 +279,23 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
if (exp.lower_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
exp.lower_bound()->Accept(collector);
// TODO: Figure out what do to here. Perhaps we shouldn't allow variables
// in bound expressions.
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
if (auto found = ContainsForbidden(collector.symbols_)) {
SetBranch(exp.input(), &exp, *found);
}
}
if (exp.upper_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
exp.upper_bound()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
if (auto found = ContainsForbidden(collector.symbols_)) {
SetBranch(exp.input(), &exp, *found);
}
}
// Check for lambda expressions
if (exp.filter_lambda().expression) {
UsedSymbolsCollector collector(*symbol_table_);
exp.filter_lambda().expression->Accept(collector);
// TODO: Extract filters or have them be inlined later in the planning.
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
if (auto found = ContainsForbidden(collector.symbols_)) {
SetBranch(exp.input(), &exp, *found);
}
}
if (exp.weight_lambda()) {
@ -165,72 +303,77 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
<< "Unexpected nullptr expression in lambda";
UsedSymbolsCollector collector(*symbol_table_);
exp.weight_lambda()->expression->Accept(collector);
// TODO: Figure out what to do here
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
if (auto found = ContainsForbidden(collector.symbols_)) {
SetBranch(exp.input(), &exp, *found);
}
}
return true;
}
bool PreVisit(ExpandUniquenessFilter<EdgeAccessor> &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(ExpandUniquenessFilter<EdgeAccessor> &op) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
if (auto found = FindForbidden(op.expand_symbol())) {
subtree_ = op.input();
parent_ = &op;
depends_on_ = found;
SetBranch(op.input(), &op, *found);
}
if (auto found = ContainsForbidden(op.previous_symbols())) {
subtree_ = op.input();
parent_ = &op;
if (depends_on_) {
depends_on_ = std::min(*found, *depends_on_);
} else {
depends_on_ = found;
}
SetBranch(op.input(), &op, *found);
}
return true;
}
bool PreVisit(Filter &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(Filter &op) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
UsedSymbolsCollector collector(*symbol_table_);
op.expression()->Accept(collector);
if (auto found = ContainsForbidden(collector.symbols_)) {
subtree_ = op.input();
parent_ = &op;
depends_on_ = found;
SetBranch(op.input(), &op, *found);
}
return true;
}
bool PreVisit(Produce &produce) override {
prev_ops_.push_back(&produce);
return true;
}
bool PostVisit(Produce &produce) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
for (auto *named_expr : produce.named_expressions()) {
UsedSymbolsCollector collector(*symbol_table_);
named_expr->expression_->Accept(collector);
if (auto found = ContainsForbidden(collector.symbols_)) {
subtree_ = produce.input();
parent_ = &produce;
depends_on_ = found;
SetBranch(produce.input(), &produce, *found);
break;
}
}
return true;
}
bool PostVisit(Optional &optional) override {
bool PreVisit(Optional &optional) override {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
bool PreVisit(Unwind &unwind) override {
prev_ops_.push_back(&unwind);
return true;
}
bool PostVisit(Unwind &unwind) override {
if (subtree_) return true;
prev_ops_.pop_back();
if (branch_.subtree) return true;
UsedSymbolsCollector collector(*symbol_table_);
unwind.input_expression()->Accept(collector);
if (auto found = ContainsForbidden(collector.symbols_)) {
subtree_ = unwind.input();
parent_ = &unwind;
depends_on_ = found;
SetBranch(unwind.input(), &unwind, *found);
}
return true;
}
@ -239,7 +382,12 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
// symbols. This is the case when we are planning *another* Cartesian after
// Produce.
bool PreVisit(CreateNode &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(CreateNode &op) override {
prev_ops_.pop_back();
CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom()->identifier_))));
for (auto &kv : op.node_atom()->properties_) {
UsedSymbolsCollector collector(*symbol_table_);
@ -249,7 +397,12 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(CreateExpand &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(CreateExpand &op) override {
prev_ops_.pop_back();
CHECK(!FindForbidden(op.input_symbol()));
CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom()->identifier_))));
CHECK(!FindForbidden(symbol_table_->at(*(op.edge_atom()->identifier_))));
@ -266,7 +419,12 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(Delete &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(Delete &op) override {
prev_ops_.pop_back();
for (auto *expr : op.expressions()) {
UsedSymbolsCollector collector(*symbol_table_);
expr->Accept(collector);
@ -275,7 +433,12 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(SetProperty &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetProperty &op) override {
prev_ops_.pop_back();
UsedSymbolsCollector collector(*symbol_table_);
op.lhs()->Accept(collector);
op.rhs()->Accept(collector);
@ -283,7 +446,12 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(SetProperties &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetProperties &op) override {
prev_ops_.pop_back();
CHECK(!FindForbidden(op.input_symbol()));
UsedSymbolsCollector collector(*symbol_table_);
op.rhs()->Accept(collector);
@ -291,24 +459,44 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(SetLabels &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetLabels &op) override {
prev_ops_.pop_back();
CHECK(!FindForbidden(op.input_symbol()));
return true;
}
bool PreVisit(RemoveProperty &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(RemoveProperty &op) override {
prev_ops_.pop_back();
UsedSymbolsCollector collector(*symbol_table_);
op.lhs()->Accept(collector);
CHECK(!ContainsForbidden(collector.symbols_));
return true;
}
bool PreVisit(RemoveLabels &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(RemoveLabels &op) override {
prev_ops_.pop_back();
CHECK(!FindForbidden(op.input_symbol()));
return true;
}
bool PreVisit(Aggregate &aggr) override {
prev_ops_.push_back(&aggr);
return true;
}
bool PostVisit(Aggregate &aggr) override {
prev_ops_.pop_back();
CHECK(!ContainsForbidden(aggr.remember()));
for (auto &elem : aggr.aggregations()) {
UsedSymbolsCollector collector(*symbol_table_);
@ -324,21 +512,36 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(Skip &skip) override {
prev_ops_.push_back(&skip);
return true;
}
bool PostVisit(Skip &skip) override {
prev_ops_.pop_back();
UsedSymbolsCollector collector(*symbol_table_);
skip.expression()->Accept(collector);
CHECK(!ContainsForbidden(collector.symbols_));
return true;
}
bool PreVisit(Limit &limit) override {
prev_ops_.push_back(&limit);
return true;
}
bool PostVisit(Limit &limit) override {
prev_ops_.pop_back();
UsedSymbolsCollector collector(*symbol_table_);
limit.expression()->Accept(collector);
CHECK(!ContainsForbidden(collector.symbols_));
return true;
}
bool PreVisit(OrderBy &order_by) override {
prev_ops_.push_back(&order_by);
return true;
}
bool PostVisit(OrderBy &order_by) override {
prev_ops_.pop_back();
CHECK(!ContainsForbidden(order_by.output_symbols()));
for (auto *expr : order_by.order_by()) {
UsedSymbolsCollector collector(*symbol_table_);
@ -348,25 +551,52 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(Distinct &distinct) override {
prev_ops_.push_back(&distinct);
return true;
}
bool PostVisit(Distinct &distinct) override {
prev_ops_.pop_back();
CHECK(!ContainsForbidden(distinct.value_symbols()));
return true;
}
bool PreVisit(Cartesian &cart) override {
prev_ops_.push_back(&cart);
return true;
}
bool PostVisit(Cartesian &cart) override {
prev_ops_.pop_back();
CHECK(!ContainsForbidden(cart.left_symbols()) &&
!ContainsForbidden(cart.right_symbols()));
return true;
}
bool PostVisit(Synchronize &op) override { return true; }
bool PreVisit(Synchronize &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(Synchronize &op) override {
prev_ops_.pop_back();
return true;
}
bool PreVisit(PullRemote &pull) override {
prev_ops_.push_back(&pull);
return true;
}
bool PostVisit(PullRemote &pull) override {
prev_ops_.pop_back();
CHECK(!ContainsForbidden(pull.symbols()));
return true;
}
bool PreVisit(PullRemoteOrderBy &pull) override {
prev_ops_.push_back(&pull);
return true;
}
bool PostVisit(PullRemoteOrderBy &pull) override {
prev_ops_.pop_back();
CHECK(!ContainsForbidden(pull.symbols()));
for (auto *expr : pull.order_by()) {
UsedSymbolsCollector collector(*symbol_table_);
@ -376,21 +606,22 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
return true;
}
// Independent subtree
std::shared_ptr<LogicalOperator> subtree_;
// Immediate parent of `subtree_`.
LogicalOperator *parent_{nullptr};
// Minimum index into forbidden_symbols_
std::experimental::optional<int64_t> depends_on_;
Branch branch_;
std::vector<LogicalOperator *> prev_ops_;
protected:
bool DefaultPreVisit() override {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
bool DefaultPostVisit() override {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
private:
std::vector<std::vector<Symbol>> forbidden_symbols_;
const SymbolTable *symbol_table_;
SymbolTable *symbol_table_;
AstStorage *storage_;
template <class TCollection>
std::experimental::optional<int64_t> ContainsForbidden(
@ -413,12 +644,23 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
}
return std::experimental::nullopt;
}
void SetBranch(std::shared_ptr<LogicalOperator> subtree,
LogicalOperator *parent_end, int64_t depends_on) {
branch_.subtree = subtree;
branch_.parent_end = parent_end;
if (branch_.depends_on) {
branch_.depends_on = std::min(depends_on, *branch_.depends_on);
} else {
branch_.depends_on = depends_on;
}
}
};
// Find the subtree parent, below which no operator uses symbols found in the
// `forbidden_symbols` set.
//
// A tuple is returned (subtree, parent, depends_on), where the parent may be
// Branch is returned {subtree, parent, depends_on}, where the parent may be
// nullptr if the given `op` is already a subtree which doesn't use
// `forbidden_symbols`. `depends_on` is set to the minimum index of the
// `forbidden_symbols` that the operators above the `subtree` depend on. The
@ -427,11 +669,19 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
auto FindIndependentSubtree(
const std::shared_ptr<LogicalOperator> &op,
const std::vector<std::vector<Symbol>> &forbidden_symbols,
const SymbolTable *symbol_table) {
IndependentSubtreeFinder finder(forbidden_symbols, symbol_table);
SymbolTable *symbol_table, AstStorage *storage) {
IndependentSubtreeFinder finder(forbidden_symbols, symbol_table, storage);
op->Accept(finder);
auto subtree = finder.subtree_ ? finder.subtree_ : op;
return std::make_tuple(subtree, finder.parent_, finder.depends_on_);
CHECK(finder.prev_ops_.empty());
if (!finder.branch_.subtree) {
finder.branch_.subtree = op;
}
if (finder.branch_.parent_end && !finder.branch_.parent_start) {
// This branch depends on another, so we need store the parent subtree
// which contains that dependency.
finder.branch_.parent_start = op;
}
return finder.branch_;
}
class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
@ -503,16 +753,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
//
// Inserting the Cartesian operator is done through PlanCartesian while
// post-visiting Produce, Aggregate or write operators.
//
// TODO: Finish Cartesian planning:
// * splitting indexed ScanAll;
// * allowing Cartesian after Produce (i.e. after WITH clause);
// * ...
auto PlanCartesian(const std::shared_ptr<LogicalOperator> &rhs_input) {
std::shared_ptr<LogicalOperator> cartesian;
auto right_branch = MakeCartesianBranch(rhs_input);
auto right_op = right_branch.subtree;
std::vector<CartesianBranch> dependent_branches;
std::vector<Branch> dependent_branches;
dependent_branches.reserve(cartesian_branches_.size() + 1);
if (right_branch.parent_start) {
dependent_branches.push_back(right_branch);
@ -1118,33 +1363,12 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
private:
struct CartesianBranch {
// Independent subtree that can become a Cartesian branch
std::shared_ptr<LogicalOperator> subtree;
// Optional parent of the subtree which needs to come after the Cartesian
// operator, because it depends on some other Cartesian branch.
// parent_start is shared_ptr, because the ownership will be transfered to
// some other operator.
std::shared_ptr<LogicalOperator> parent_start;
// parent_end is pointer, because we may only change its input.
LogicalOperator *parent_end{nullptr};
// Minimum index of the branch this parent depends on.
std::experimental::optional<int64_t> depends_on;
};
CartesianBranch MakeCartesianBranch(
const std::shared_ptr<LogicalOperator> &input) {
CartesianBranch branch;
std::tie(branch.subtree, branch.parent_end, branch.depends_on) =
FindIndependentSubtree(input, cartesian_symbols_,
&distributed_plan_.symbol_table);
if (branch.parent_end) {
// This branch depends on another, so we need store the parent subtree
// which contains that dependency.
branch.parent_start = input;
}
Branch MakeCartesianBranch(const std::shared_ptr<LogicalOperator> &input) {
auto branch = FindIndependentSubtree(input, cartesian_symbols_,
&distributed_plan_.symbol_table,
&distributed_plan_.ast_storage);
if (on_master_ && cartesian_branches_.empty()) {
// Since we are planning a new Cartesian, the first CartesianBranch must
// Since we are planning a new Cartesian, the first Branch must
// not be sent to workers if we are executing on master.
return branch;
}
@ -1160,9 +1384,9 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
std::atomic<int64_t> &next_plan_id_;
std::vector<LogicalOperator *> prev_ops_;
// Operators that still need to be wired into Cartesian.
std::vector<CartesianBranch> cartesian_branches_;
std::vector<Branch> cartesian_branches_;
// Symbols modified by the currently stored Cartesian branches. Each vector
// corresponds to the above CartesianBranch.
// corresponds to the above Cartesian branch.
std::vector<std::vector<Symbol>> cartesian_symbols_;
bool has_scan_all_ = false;
bool needs_synchronize_ = false;

View File

@ -2032,7 +2032,6 @@ and returns true, once.")
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
// TODO: Consider whether we want to treat Optional as having single input.
bool HasSingleInput() const override { return true; }
std::shared_ptr<LogicalOperator> input() const override { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) override {
@ -2439,8 +2438,7 @@ by having only one result from each worker.")
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(username "std::string" :reader t)
(password "Expression *"
:reader t
(password "Expression *" :reader t
:save-fun #'save-pointer
:load-fun #'load-pointer
:capnp-type "Ast.Tree"

View File

@ -29,8 +29,6 @@ namespace query {
}
} // namespace query
using std::string_literals::operator""s;
using namespace query::plan;
using query::AstStorage;
using query::SingleQuery;
@ -2318,7 +2316,7 @@ TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) {
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TYPED_TEST(TestPlanner, DistributedCartesianCreate) {
TYPED_TEST(TestPlanner, DistributedCartesianCreateExpand) {
// Test MATCH (a), (b) CREATE (a)-[e:r]->(b) RETURN e
AstStorage storage;
FakeDbAccessor dba;
@ -2450,6 +2448,216 @@ TYPED_TEST(TestPlanner, DistributedCartesianFilter) {
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByProperty) {
// Test MATCH (a), (b :label) WHERE b.prop = a RETURN b
AstStorage storage;
FakeDbAccessor dba;
auto label = dba.Label("label");
auto prop = dba.Property("prop");
// Set indexes so that lookup by property is preferred.
dba.SetIndexCount(label, 1024);
dba.SetIndexCount(label, prop, 0);
auto *node_a = NODE("a");
auto *node_b = NODE("b", label);
QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b)),
WHERE(EQ(PROPERTY_LOOKUP("b", prop), IDENT("a"))),
RETURN("b")));
auto symbol_table = MakeSymbolTable(*storage.query());
auto sym_a = symbol_table.at(*node_a->identifier_);
auto sym_b = symbol_table.at(*node_b->identifier_);
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}));
// We still expect only indexed lookup by label because property depends on
// Cartesian branch.
auto right_cart =
MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
ExpectFilter(), ExpectProduce()),
MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByLowerBound) {
// Test MATCH (a), (b :label) WHERE a < b.prop RETURN b
AstStorage storage;
FakeDbAccessor dba;
auto label = dba.Label("label");
auto prop = dba.Property("prop");
// Set indexes so that lookup by property is preferred.
dba.SetIndexCount(label, 1024);
dba.SetIndexCount(label, prop, 0);
auto *node_a = NODE("a");
auto *node_b = NODE("b", label);
QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b)),
WHERE(LESS(IDENT("a"), PROPERTY_LOOKUP("b", prop))),
RETURN("b")));
auto symbol_table = MakeSymbolTable(*storage.query());
auto sym_a = symbol_table.at(*node_a->identifier_);
auto sym_b = symbol_table.at(*node_b->identifier_);
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}));
// We still expect only indexed lookup by label because lower bound depends on
// Cartesian branch.
auto right_cart =
MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
ExpectFilter(), ExpectProduce()),
MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByUpperBound) {
// Test MATCH (a), (b :label) WHERE a > b.prop RETURN b
AstStorage storage;
FakeDbAccessor dba;
auto label = dba.Label("label");
auto prop = dba.Property("prop");
// Set indexes so that lookup by property is preferred.
dba.SetIndexCount(label, 1024);
dba.SetIndexCount(label, prop, 0);
auto *node_a = NODE("a");
auto *node_b = NODE("b", label);
QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b)),
WHERE(GREATER(IDENT("a"), PROPERTY_LOOKUP("b", prop))),
RETURN("b")));
auto symbol_table = MakeSymbolTable(*storage.query());
auto sym_a = symbol_table.at(*node_a->identifier_);
auto sym_b = symbol_table.at(*node_b->identifier_);
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}));
// We still expect only indexed lookup by label because upper bound depends on
// Cartesian branch.
auto right_cart =
MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
ExpectFilter(), ExpectProduce()),
MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TEST(TestPlanner, DistributedCartesianIndexedScanByBothBounds) {
// Test MATCH (a), (b :label) WHERE a > b.prop > a RETURN b
AstStorage storage;
FakeDbAccessor dba;
auto label = dba.Label("label");
auto prop = dba.Property("prop");
// Set indexes so that lookup by property is preferred.
dba.SetIndexCount(label, 1024);
dba.SetIndexCount(label, prop, 0);
SymbolTable symbol_table;
auto sym_a = symbol_table.CreateSymbol("a", true);
auto scan_a = std::make_shared<ScanAll>(nullptr, sym_a);
auto sym_b = symbol_table.CreateSymbol("b", true);
query::Expression *lower_expr = IDENT("a");
symbol_table[*lower_expr] = sym_a;
auto lower_bound = utils::MakeBoundExclusive(lower_expr);
query::Expression *upper_expr = IDENT("a");
symbol_table[*upper_expr] = sym_a;
auto upper_bound = utils::MakeBoundExclusive(upper_expr);
auto scan_b = std::make_shared<ScanAllByLabelPropertyRange>(
scan_a, sym_b, label, prop, lower_bound, upper_bound);
auto ident_b = IDENT("b");
symbol_table[*ident_b] = sym_b;
auto as_b = NEXPR("b", ident_b);
auto produce = std::make_shared<Produce>(
scan_b, std::vector<query::NamedExpression *>{as_b});
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}));
// We still expect only indexed lookup by label because both bounds depend on
// Cartesian branch.
auto right_cart =
MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
ExpectFilter(), ExpectProduce()),
MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
CheckDistributedPlan(*produce, symbol_table, expected);
}
TEST(TestPlanner, DistributedCartesianIndexedScanByLowerWithBothBounds) {
// Test MATCH (a), (b :label) WHERE a > b.prop > 42 RETURN b
AstStorage storage;
FakeDbAccessor dba;
auto label = dba.Label("label");
auto prop = dba.Property("prop");
// Set indexes so that lookup by property is preferred.
dba.SetIndexCount(label, 1024);
dba.SetIndexCount(label, prop, 0);
SymbolTable symbol_table;
auto sym_a = symbol_table.CreateSymbol("a", true);
auto scan_a = std::make_shared<ScanAll>(nullptr, sym_a);
auto sym_b = symbol_table.CreateSymbol("b", true);
query::Expression *lower_expr = LITERAL(42);
auto lower_bound = utils::MakeBoundExclusive(lower_expr);
query::Expression *upper_expr = IDENT("a");
symbol_table[*upper_expr] = sym_a;
auto upper_bound = utils::MakeBoundExclusive(upper_expr);
auto scan_b = std::make_shared<ScanAllByLabelPropertyRange>(
scan_a, sym_b, label, prop, lower_bound, upper_bound);
auto ident_b = IDENT("b");
symbol_table[*ident_b] = sym_b;
auto as_b = NEXPR("b", ident_b);
auto produce = std::make_shared<Produce>(
scan_b, std::vector<query::NamedExpression *>{as_b});
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}));
// We still expect indexed lookup by label property range above lower bound,
// because upper bound depends on Cartesian branch.
auto right_cart =
MakeCheckers(ExpectScanAllByLabelPropertyRange(
label, prop, lower_bound, std::experimental::nullopt),
ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
ExpectFilter(), ExpectProduce()),
MakeCheckers(ExpectScanAll()),
MakeCheckers(ExpectScanAllByLabelPropertyRange(
label, prop, lower_bound, std::experimental::nullopt)));
CheckDistributedPlan(*produce, symbol_table, expected);
}
TEST(TestPlanner, DistributedCartesianIndexedScanByUpperWithBothBounds) {
// Test MATCH (a), (b :label) WHERE 42 > b.prop > a RETURN b
AstStorage storage;
FakeDbAccessor dba;
auto label = dba.Label("label");
auto prop = dba.Property("prop");
// Set indexes so that lookup by property is preferred.
dba.SetIndexCount(label, 1024);
dba.SetIndexCount(label, prop, 0);
SymbolTable symbol_table;
auto sym_a = symbol_table.CreateSymbol("a", true);
auto scan_a = std::make_shared<ScanAll>(nullptr, sym_a);
auto sym_b = symbol_table.CreateSymbol("b", true);
query::Expression *lower_expr = IDENT("a");
symbol_table[*lower_expr] = sym_a;
auto lower_bound = utils::MakeBoundExclusive(lower_expr);
query::Expression *upper_expr = LITERAL(42);
auto upper_bound = utils::MakeBoundExclusive(upper_expr);
auto scan_b = std::make_shared<ScanAllByLabelPropertyRange>(
scan_a, sym_b, label, prop, lower_bound, upper_bound);
auto ident_b = IDENT("b");
symbol_table[*ident_b] = sym_b;
auto as_b = NEXPR("b", ident_b);
auto produce = std::make_shared<Produce>(
scan_b, std::vector<query::NamedExpression *>{as_b});
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}));
// We still expect indexed lookup by label property range below upper bound,
// because lower bound depends on Cartesian branch.
auto right_cart =
MakeCheckers(ExpectScanAllByLabelPropertyRange(
label, prop, std::experimental::nullopt, upper_bound),
ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
ExpectFilter(), ExpectProduce()),
MakeCheckers(ExpectScanAll()),
MakeCheckers(ExpectScanAllByLabelPropertyRange(
label, prop, std::experimental::nullopt, upper_bound)));
CheckDistributedPlan(*produce, symbol_table, expected);
}
TYPED_TEST(TestPlanner, DistributedCartesianProduce) {
// Test MATCH (a) WITH a MATCH (b) WHERE b = a RETURN b;
AstStorage storage;