Detect unsupported and dependant cases in Cartesian

Summary:
This is the initial step to getting a correct version of distributed
planning of Cartesian operator. Functions and structs have been added
which should collect enough information to correctly order the execution
with regards to dependencies among Cartesian branches. The support
functionality should be the same as was before, but unsupported cases
should now raise an exception instead of leading to undefined behaviour.

Reviewers: msantl, mtomic, buda

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1418
This commit is contained in:
Teon Banek 2018-06-05 15:51:28 +02:00
parent bbd96b25e2
commit 2721c40a0c
2 changed files with 225 additions and 56 deletions

View File

@ -40,6 +40,146 @@ int64_t AddWorkerPlan(DistributedPlan &distributed_plan,
return plan_id;
}
// Find the subtree parent, below which no operator uses symbols found in the
// `forbidden_symbols` set.
class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
public:
IndependentSubtreeFinder(const std::vector<Symbol> &forbidden_symbols,
const SymbolTable *symbol_table)
: forbidden_symbols_(forbidden_symbols), symbol_table_(symbol_table) {}
using HierarchicalLogicalOperatorVisitor::PostVisit;
using HierarchicalLogicalOperatorVisitor::PreVisit;
using HierarchicalLogicalOperatorVisitor::Visit;
// These don't use any symbols
bool Visit(Once &) override { return true; }
bool Visit(CreateIndex &) override { return true; }
bool PostVisit(ScanAll &scan) override { return true; }
bool PostVisit(ScanAllByLabel &scan) override { return true; }
bool PostVisit(ScanAllByLabelPropertyRange &scan) override {
if (subtree_) return true;
if (scan.lower_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
scan.lower_bound()->value()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
}
if (scan.upper_bound()) {
UsedSymbolsCollector collector(*symbol_table_);
scan.upper_bound()->value()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
}
return true;
}
bool PostVisit(ScanAllByLabelPropertyValue &scan) override {
if (subtree_) return true;
UsedSymbolsCollector collector(*symbol_table_);
scan.expression()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
return true;
}
bool PostVisit(Expand &exp) override {
if (subtree_) return true;
if (IsForbidden(exp.input_symbol())) {
subtree_ = exp.input();
parent_ = &exp;
} else if (exp.existing_node() && IsForbidden(exp.node_symbol())) {
subtree_ = exp.input();
parent_ = &exp;
}
CHECK(!IsForbidden(exp.edge_symbol()))
<< "Expand uses an already used edge symbol.";
return true;
}
bool PostVisit(ExpandUniquenessFilter<EdgeAccessor> &op) override {
if (subtree_) return true;
if (IsForbidden(op.expand_symbol()) ||
ContainsForbidden(op.previous_symbols())) {
subtree_ = op.input();
parent_ = &op;
}
return true;
}
bool PostVisit(Filter &op) override {
if (subtree_) return true;
UsedSymbolsCollector collector(*symbol_table_);
op.expression()->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
subtree_ = op.input();
parent_ = &op;
}
return true;
}
bool PostVisit(Produce &produce) override {
if (subtree_) return true;
for (auto *named_expr : produce.named_expressions()) {
UsedSymbolsCollector collector(*symbol_table_);
named_expr->expression_->Accept(collector);
if (ContainsForbidden(collector.symbols_)) {
subtree_ = produce.input();
parent_ = &produce;
break;
}
}
return true;
}
// Independent subtree
std::shared_ptr<LogicalOperator> subtree_;
// Immediate parent of `subtree_`.
LogicalOperator *parent_{nullptr};
protected:
bool DefaultPostVisit() override {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
template <class TCollection>
bool ContainsForbidden(const TCollection &symbols) {
for (const auto &symbol : symbols) {
if (IsForbidden(symbol)) {
return true;
}
}
return false;
}
bool IsForbidden(const Symbol &symbol) {
return utils::Contains(forbidden_symbols_, symbol);
}
private:
std::vector<Symbol> forbidden_symbols_;
const SymbolTable *symbol_table_;
};
// Find the subtree parent, below which no operator uses symbols found in the
// `forbidden_symbols` set.
//
// A pair is returned (subtree, parent), where the parent may be nullptr if the
// given `op` is already a subtree which doesn't use `forbidden_symbols`.
auto FindIndependentSubtree(const std::shared_ptr<LogicalOperator> &op,
const std::vector<Symbol> &forbidden_symbols,
const SymbolTable *symbol_table) {
IndependentSubtreeFinder finder(forbidden_symbols, symbol_table);
op->Accept(finder);
auto subtree = finder.subtree_ ? finder.subtree_ : op;
return std::make_pair(subtree, finder.parent_);
}
class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
public:
DistributedPlanner(DistributedPlan &distributed_plan,
@ -105,27 +245,45 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// * checking Expand into existing can be used;
// * allowing Cartesian after Produce (i.e. after WITH clause);
// * ...
std::shared_ptr<Cartesian> PlanCartesian(
const std::shared_ptr<LogicalOperator> &rhs_input) {
std::shared_ptr<Cartesian> cartesian;
auto pull_id = AddWorkerPlan(rhs_input);
std::shared_ptr<LogicalOperator> right_op = std::make_shared<PullRemote>(
rhs_input, pull_id,
rhs_input->ModifiedSymbols(distributed_plan_.symbol_table));
auto PlanCartesian(const std::shared_ptr<LogicalOperator> &rhs_input) {
std::shared_ptr<LogicalOperator> cartesian;
auto right_branch = MakeCartesianBranch(rhs_input);
auto right_op = right_branch.subtree;
// We use this ordering of operators, so that left hand side can be
// accumulated without having whole product accumulations. This (obviously)
// relies on the fact that Cartesian accumulation strategy accumulates the
// left operator input.
while (!left_cartesians_.empty()) {
auto left_op = left_cartesians_.back();
left_cartesians_.pop_back();
while (!cartesian_branches_.empty()) {
auto left_branch = cartesian_branches_.back();
if (left_branch.parent_start) {
throw utils::NotYetImplemented("distributed Cartesian planning");
}
cartesian_branches_.pop_back();
cartesian = std::make_shared<Cartesian>(
left_op, left_op->ModifiedSymbols(distributed_plan_.symbol_table),
left_branch.subtree,
left_branch.subtree->ModifiedSymbols(distributed_plan_.symbol_table),
right_op, right_op->ModifiedSymbols(distributed_plan_.symbol_table));
right_op = cartesian;
}
if (right_branch.parent_start) {
right_branch.parent_end->set_input(cartesian);
cartesian = right_branch.parent_start;
}
cartesian_symbols_.clear();
return cartesian;
};
}
void AddForCartesian(ScanAll *scan) {
cartesian_branches_.emplace_back(MakeCartesianBranch(scan->input()));
// Collect modified symbols of the whole branch (independent subtree +
// parent subtree).
auto modified_symbols =
scan->input()->ModifiedSymbols(distributed_plan_.symbol_table);
cartesian_symbols_.insert(cartesian_symbols_.end(),
modified_symbols.begin(), modified_symbols.end());
// Rewire the scan to be cut from the branch.
scan->set_input(std::make_shared<Once>());
}
bool PreVisit(ScanAll &scan) override {
prev_ops_.push_back(&scan);
@ -135,13 +293,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.pop_back();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
AddForCartesian(&scan);
}
has_scan_all_ = true;
return true;
@ -155,13 +307,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.pop_back();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
AddForCartesian(&scan);
}
has_scan_all_ = true;
return true;
@ -174,13 +320,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.pop_back();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
AddForCartesian(&scan);
}
has_scan_all_ = true;
return true;
@ -193,13 +333,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
prev_ops_.pop_back();
should_split_ = true;
if (has_scan_all_) {
// Prepare for Cartesian planning
auto id = AddWorkerPlan(scan.input());
auto left_op = std::make_shared<PullRemote>(
scan.input(), id,
scan.input()->ModifiedSymbols(distributed_plan_.symbol_table));
left_cartesians_.push_back(left_op);
scan.set_input(std::make_shared<Once>());
AddForCartesian(&scan);
}
has_scan_all_ = true;
return true;
@ -367,7 +501,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(Aggregate &aggr_op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(aggr_op, PlanCartesian(aggr_op.input()));
return true;
}
@ -525,7 +659,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(Produce &produce) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
// TODO: It might be better to plan Cartesians later if this Produce isn't
// the last one and is not followed by an operator which requires a merge
// point (Skip, OrderBy, etc.).
@ -590,7 +724,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(CreateNode &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
// Creation needs to be modified if running on master, so as to distribute
@ -608,7 +742,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(CreateExpand &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
@ -621,7 +755,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(Delete &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
@ -634,7 +768,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(SetProperty &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
@ -647,7 +781,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(SetProperties &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
@ -660,7 +794,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(SetLabels &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
@ -673,7 +807,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(RemoveProperty &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
@ -686,7 +820,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(RemoveLabels &op) override {
prev_ops_.pop_back();
if (!left_cartesians_.empty()) {
if (!cartesian_branches_.empty()) {
Split(op, PlanCartesian(op.input()));
}
needs_synchronize_ = true;
@ -704,11 +838,46 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
private:
struct CartesianBranch {
// Independent subtree that can become a Cartesian branch
std::shared_ptr<LogicalOperator> subtree;
// Optional parent of the subtree which needs to come after the Cartesian
// operator, because it depends on some other Cartesian branch.
// parent_start is shared_ptr, because the ownership will be transfered to
// some other operator.
std::shared_ptr<LogicalOperator> parent_start;
// parent_end is pointer, because we may only change its input.
LogicalOperator *parent_end{nullptr};
// Minimum index of the branch this parent depends on.
int64_t depends_on{-1};
};
CartesianBranch MakeCartesianBranch(
const std::shared_ptr<LogicalOperator> &input) {
CartesianBranch branch;
std::tie(branch.subtree, branch.parent_end) = FindIndependentSubtree(
input, cartesian_symbols_, &distributed_plan_.symbol_table);
if (branch.parent_end) {
// This branch depends on another, so we need to set on which one and
// store the parent subtree which contains that dependency.
branch.parent_start = input;
branch.depends_on = cartesian_branches_.size();
}
// Send the independent subtree to workers and wire it in PullRemote
auto id = AddWorkerPlan(branch.subtree);
branch.subtree = std::make_shared<PullRemote>(
branch.subtree, id,
branch.subtree->ModifiedSymbols(distributed_plan_.symbol_table));
return branch;
}
DistributedPlan &distributed_plan_;
std::atomic<int64_t> &next_plan_id_;
std::vector<LogicalOperator *> prev_ops_;
// Left side operators that still need to be wired into Cartesian.
std::vector<std::shared_ptr<LogicalOperator>> left_cartesians_;
// Operators that still need to be wired into Cartesian.
std::vector<CartesianBranch> cartesian_branches_;
// Symbols modified by the currently stored Cartesian branches.
std::vector<Symbol> cartesian_symbols_;
bool has_scan_all_ = false;
bool needs_synchronize_ = false;
bool should_split_ = false;

View File

@ -751,7 +751,7 @@ property value.
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(input-symbol "Symbol" :reader t :scope :protected)
(existing-node :bool :scope :protected :documentation
(existing-node :bool :reader t :scope :protected :documentation
"If the given node atom refer to a symbol that has already
been expanded and should be just validated in the frame.")
(graph-view "GraphView" :reader t :scope :protected
@ -1052,7 +1052,7 @@ pulled.")
((input "std::shared_ptr<LogicalOperator>"
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(expression "Expression *"
(expression "Expression *" :reader t
:capnp-type "Ast.Tree" :capnp-init nil
:capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "Expression *")
:save-fun #'save-pointer :load-fun #'load-pointer))
@ -1454,8 +1454,8 @@ If a label does not exist on a Vertex, nothing happens.")
((input "std::shared_ptr<LogicalOperator>"
:capnp-save #'save-operator-pointer
:capnp-load #'load-operator-pointer)
(expand-symbol "Symbol")
(previous-symbols "std::vector<Symbol>"
(expand-symbol "Symbol" :reader t)
(previous-symbols "std::vector<Symbol>" :reader t
:capnp-save (lcp:capnp-save-vector "::query::capnp::Symbol" "Symbol")
:capnp-load (lcp:capnp-load-vector "::query::capnp::Symbol" "Symbol")))
(:documentation