Always plan Synchronize for write queries

Reviewers: florijan, msantl

Reviewed By: florijan, msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1179
This commit is contained in:
Teon Banek 2018-02-14 16:33:01 +01:00
parent 4225d5c871
commit 8856682c7f
5 changed files with 158 additions and 55 deletions

View File

@ -104,27 +104,6 @@ Interpreter::Results Interpreter::operator()(
}
}
auto *logical_plan = &plan->plan();
// TODO review: is the check below necessary?
DCHECK(dynamic_cast<const plan::Produce *>(logical_plan) ||
dynamic_cast<const plan::Skip *>(logical_plan) ||
dynamic_cast<const plan::Limit *>(logical_plan) ||
dynamic_cast<const plan::OrderBy *>(logical_plan) ||
dynamic_cast<const plan::Distinct *>(logical_plan) ||
dynamic_cast<const plan::Union *>(logical_plan) ||
dynamic_cast<const plan::CreateNode *>(logical_plan) ||
dynamic_cast<const plan::CreateExpand *>(logical_plan) ||
dynamic_cast<const plan::SetProperty *>(logical_plan) ||
dynamic_cast<const plan::SetProperties *>(logical_plan) ||
dynamic_cast<const plan::SetLabels *>(logical_plan) ||
dynamic_cast<const plan::RemoveProperty *>(logical_plan) ||
dynamic_cast<const plan::RemoveLabels *>(logical_plan) ||
dynamic_cast<const plan::Delete *>(logical_plan) ||
dynamic_cast<const plan::Merge *>(logical_plan) ||
dynamic_cast<const plan::CreateIndex *>(logical_plan) ||
dynamic_cast<const plan::PullRemote *>(logical_plan))
<< "Unknown top level LogicalOperator";
ctx.symbol_table_ = plan->symbol_table();
auto planning_time = planning_timer.Elapsed();
@ -140,10 +119,10 @@ Interpreter::Results Interpreter::operator()(
// have to be correct (for Bolt clients)
summary["type"] = "rw";
auto cursor = logical_plan->MakeCursor(ctx.db_accessor_);
auto cursor = plan->plan().MakeCursor(ctx.db_accessor_);
std::vector<std::string> header;
std::vector<Symbol> output_symbols(
logical_plan->OutputSymbols(ctx.symbol_table_));
plan->plan().OutputSymbols(ctx.symbol_table_));
for (const auto &symbol : output_symbols) {
// When the symbol is aliased or expanded from '*' (inside RETURN or
// WITH), then there is no token position, so use symbol name.

View File

@ -42,12 +42,14 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// Returns true if the plan should be run on master and workers. Note, that
// false is returned if the plan is already split.
bool ShouldSplit() {
bool ShouldSplit() const {
// At the moment, the plan should be run on workers only if we encountered a
// ScanAll.
return !distributed_plan_.worker_plan && has_scan_all_;
}
bool NeedsSynchronize() const { return needs_synchronize_; }
// ScanAll are all done on each machine locally.
bool PreVisit(ScanAll &scan) override {
prev_ops_.push_back(&scan);
@ -410,19 +412,20 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
bool PostVisit(Accumulate &acc) override {
prev_ops_.pop_back();
if (!ShouldSplit()) return true;
if (acc.advance_command())
throw utils::NotYetImplemented("WITH clause distributed planning");
// Accumulate on workers, but set advance_command to false, because the
// Synchronize operator should do that in distributed execution.
distributed_plan_.worker_plan =
std::make_shared<Accumulate>(acc.input(), acc.symbols(), false);
DCHECK(needs_synchronize_)
<< "Expected Accumulate to follow a write operator";
// Create a synchronization point. Use pull remote to fetch accumulated
// symbols from workers. Local input operations are the same as on workers.
auto pull_remote = std::make_shared<PullRemote>(
nullptr, distributed_plan_.plan_id, acc.symbols());
auto sync = std::make_shared<Synchronize>(
distributed_plan_.worker_plan, pull_remote, acc.advance_command());
// symbols from workers. Accumulation is done through Synchronize, so we
// don't need the Accumulate operator itself. Local input operations are the
// same as on workers.
std::shared_ptr<PullRemote> pull_remote;
if (ShouldSplit()) {
distributed_plan_.worker_plan = acc.input();
pull_remote = std::make_shared<PullRemote>(
nullptr, distributed_plan_.plan_id, acc.symbols());
}
auto sync = std::make_shared<Synchronize>(acc.input(), pull_remote,
acc.advance_command());
auto *prev_op = prev_ops_.back();
// Wire the previous operator (on master) into our synchronization operator.
// TODO: Find a better way to replace the previous operation's input than
@ -432,8 +435,9 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
} else if (auto *aggr_op = dynamic_cast<Aggregate *>(prev_op)) {
aggr_op->set_input(sync);
} else {
throw utils::NotYetImplemented("WITH clause distributed planning");
throw utils::NotYetImplemented("distributed planning");
}
needs_synchronize_ = false;
return true;
}
@ -444,41 +448,89 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PostVisit(CreateNode &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
bool PreVisit(CreateExpand &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(CreateExpand &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
bool PreVisit(Delete &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(Delete &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
bool PreVisit(SetProperty &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetProperty &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
bool PreVisit(SetProperties &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetProperties &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
bool PreVisit(SetLabels &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(SetLabels &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
bool PreVisit(RemoveProperty &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(RemoveProperty &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
bool PreVisit(RemoveLabels &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(RemoveLabels &op) override {
prev_ops_.pop_back();
needs_synchronize_ = true;
return true;
}
protected:
bool DefaultPreVisit() override {
throw utils::NotYetImplemented("distributed planning");
@ -496,6 +548,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
std::unique_ptr<LogicalOperator> master_aggr_;
std::vector<LogicalOperator *> prev_ops_;
bool has_scan_all_ = false;
bool needs_synchronize_ = false;
void RaiseIfCartesian() {
if (has_scan_all_)
@ -526,10 +579,24 @@ DistributedPlan MakeDistributedPlan(const LogicalOperator &original_plan,
// We haven't split the plan, this means that it should be the same on
// master and worker. We only need to prepend PullRemote to master plan.
distributed_plan.worker_plan = std::move(distributed_plan.master_plan);
distributed_plan.master_plan = std::make_unique<PullRemote>(
distributed_plan.worker_plan, distributed_plan.plan_id,
distributed_plan.worker_plan->OutputSymbols(
distributed_plan.symbol_table));
// If the plan performs writes, we need to finish with Synchronize.
if (planner.NeedsSynchronize()) {
auto pull_remote = std::make_shared<PullRemote>(
nullptr, distributed_plan.plan_id,
distributed_plan.worker_plan->OutputSymbols(
distributed_plan.symbol_table));
distributed_plan.master_plan = std::make_unique<Synchronize>(
distributed_plan.worker_plan, pull_remote, false);
} else {
distributed_plan.master_plan = std::make_unique<PullRemote>(
distributed_plan.worker_plan, distributed_plan.plan_id,
distributed_plan.worker_plan->OutputSymbols(
distributed_plan.symbol_table));
}
} else if (!distributed_plan.worker_plan && planner.NeedsSynchronize()) {
// If the plan performs writes, we still need to Synchronize.
distributed_plan.master_plan = std::make_unique<Synchronize>(
std::move(distributed_plan.master_plan), nullptr, false);
}
return distributed_plan;
}

View File

@ -2362,6 +2362,11 @@ class Synchronize : public LogicalOperator {
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(
const SymbolTable &symbol_table) const override {
return input_->OutputSymbols(symbol_table);
}
auto input() const { return input_; }
auto pull_remote() const { return pull_remote_; }
auto advance_command() const { return advance_command_; }

View File

@ -522,7 +522,7 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor {
out << "* Synchronize";
if (op.advance_command()) out << " (ADV CMD)";
});
Branch(*op.pull_remote());
if (op.pull_remote()) Branch(*op.pull_remote());
op.input()->Accept(*this);
return false;
}

View File

@ -384,16 +384,28 @@ class ExpectPullRemote : public OpChecker<PullRemote> {
class ExpectSynchronize : public OpChecker<Synchronize> {
public:
ExpectSynchronize() {}
ExpectSynchronize(const std::vector<Symbol> &symbols)
: expect_pull_(symbols) {}
explicit ExpectSynchronize(bool advance_command)
: has_pull_(false), advance_command_(advance_command) {}
ExpectSynchronize(const std::vector<Symbol> &symbols = {},
bool advance_command = false)
: expect_pull_(symbols),
has_pull_(true),
advance_command_(advance_command) {}
void ExpectOp(Synchronize &op, const SymbolTable &symbol_table) override {
expect_pull_.ExpectOp(*op.pull_remote(), symbol_table);
if (has_pull_) {
ASSERT_TRUE(op.pull_remote());
expect_pull_.ExpectOp(*op.pull_remote(), symbol_table);
} else {
EXPECT_FALSE(op.pull_remote());
}
EXPECT_EQ(op.advance_command(), advance_command_);
}
private:
ExpectPullRemote expect_pull_;
bool has_pull_ = true;
bool advance_command_ = false;
};
auto MakeSymbolTable(query::Query &query) {
@ -565,7 +577,9 @@ TYPED_TEST(TestPlanner, CreateNodeReturn) {
ExpectProduce());
{
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), acc, ExpectProduce()), {}};
MakeCheckers(ExpectCreateNode(), ExpectSynchronize(false),
ExpectProduce()),
{}};
std::atomic<int64_t> next_plan_id{0};
auto distributed_plan =
MakeDistributedPlan(planner.plan(), symbol_table, next_plan_id);
@ -582,6 +596,11 @@ TYPED_TEST(TestPlanner, CreateExpand) {
QUERY(SINGLE_QUERY(CREATE(PATTERN(
NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(),
ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
TYPED_TEST(TestPlanner, CreateMultipleNode) {
@ -589,6 +608,11 @@ TYPED_TEST(TestPlanner, CreateMultipleNode) {
AstTreeStorage storage;
QUERY(SINGLE_QUERY(CREATE(PATTERN(NODE("n")), PATTERN(NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateNode());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateNode(),
ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
TYPED_TEST(TestPlanner, CreateNodeExpandNode) {
@ -602,6 +626,11 @@ TYPED_TEST(TestPlanner, CreateNodeExpandNode) {
PATTERN(NODE("l")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectCreateNode());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(), ExpectCreateNode(),
ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
TYPED_TEST(TestPlanner, CreateNamedPattern) {
@ -614,6 +643,11 @@ TYPED_TEST(TestPlanner, CreateNamedPattern) {
"p", NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectConstructNamedPath());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(),
ExpectConstructNamedPath(), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
TYPED_TEST(TestPlanner, MatchCreateExpand) {
@ -628,7 +662,7 @@ TYPED_TEST(TestPlanner, MatchCreateExpand) {
NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectCreateExpand(), ExpectPullRemote()),
MakeCheckers(ExpectScanAll(), ExpectCreateExpand(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectCreateExpand())};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -732,7 +766,8 @@ TYPED_TEST(TestPlanner, OptionalMatchNamedPatternReturn) {
auto edge = EDGE("r");
auto node_m = NODE("m");
auto pattern = NAMED_PATTERN("p", node_n, edge, node_m);
QUERY(SINGLE_QUERY(OPTIONAL_MATCH(pattern), RETURN("p")));
auto as_p = AS("p");
QUERY(SINGLE_QUERY(OPTIONAL_MATCH(pattern), RETURN("p", as_p)));
auto symbol_table = MakeSymbolTable(*storage.query());
std::list<BaseOpChecker *> optional{new ExpectScanAll(), new ExpectExpand(),
new ExpectConstructNamedPath()};
@ -744,6 +779,12 @@ TYPED_TEST(TestPlanner, OptionalMatchNamedPatternReturn) {
auto planner = MakePlanner<TypeParam>(db, storage, symbol_table);
CheckPlan(planner.plan(), symbol_table,
ExpectOptional(optional_symbols, optional), ExpectProduce());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectOptional(optional_symbols, optional), ExpectProduce(),
ExpectPullRemote({symbol_table.at(*as_p)})),
MakeCheckers(ExpectOptional(optional_symbols, optional),
ExpectProduce())};
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TYPED_TEST(TestPlanner, MatchWhereReturn) {
@ -773,7 +814,7 @@ TYPED_TEST(TestPlanner, MatchDelete) {
QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), DELETE(IDENT("n"))));
CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectDelete());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectDelete(), ExpectPullRemote()),
MakeCheckers(ExpectScanAll(), ExpectDelete(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectDelete())};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -792,7 +833,7 @@ TYPED_TEST(TestPlanner, MatchNodeSet) {
ExpectSetProperties(), ExpectSetLabels());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectSetProperty(), ExpectSetProperties(),
ExpectSetLabels(), ExpectPullRemote()),
ExpectSetLabels(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectSetProperty(), ExpectSetProperties(),
ExpectSetLabels())};
CheckDistributedPlan<TypeParam>(storage, expected);
@ -811,7 +852,7 @@ TYPED_TEST(TestPlanner, MatchRemove) {
ExpectRemoveLabels());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(),
ExpectRemoveLabels(), ExpectPullRemote()),
ExpectRemoveLabels(), ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(),
ExpectRemoveLabels())};
CheckDistributedPlan<TypeParam>(storage, expected);
@ -947,6 +988,11 @@ TYPED_TEST(TestPlanner, CreateMultiExpand) {
PATTERN(NODE("n"), EDGE("p", Direction::OUT, {p}), NODE("l")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(),
ExpectCreateExpand(), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
TYPED_TEST(TestPlanner, MatchWithSumWhereReturn) {
@ -1032,7 +1078,7 @@ TYPED_TEST(TestPlanner, MatchWithCreate) {
ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectCreateExpand(),
ExpectPullRemote()),
ExpectSynchronize()),
MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectCreateExpand())};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -1074,6 +1120,12 @@ TYPED_TEST(TestPlanner, CreateWithSkipReturnLimit) {
// us here (but who knows if they change it again).
CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc,
ExpectProduce(), ExpectSkip(), ExpectProduce(), ExpectLimit());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectSynchronize(true),
ExpectProduce(), ExpectSkip(), ExpectProduce(),
ExpectLimit()),
{}};
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
TYPED_TEST(TestPlanner, CreateReturnSumSkipLimit) {
@ -2012,10 +2064,10 @@ TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) {
database::Master db;
auto planner = MakePlanner<TypeParam>(db, storage, symbol_table);
ExpectedDistributedPlan expected{
MakeCheckers(ExpectScanAll(), ExpectCreateNode(), acc,
MakeCheckers(ExpectScanAll(), ExpectCreateNode(),
ExpectSynchronize({symbol_table.at(*ident_m)}),
ExpectProduce()),
MakeCheckers(ExpectScanAll(), ExpectCreateNode(), acc)};
MakeCheckers(ExpectScanAll(), ExpectCreateNode())};
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
} // namespace