Pass a flag for distributed CreateNode

Reviewers: florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1218
This commit is contained in:
Teon Banek 2018-02-21 13:55:06 +01:00
parent 5ff1ca4a5d
commit c7eaf4711a
3 changed files with 31 additions and 13 deletions

View File

@ -568,8 +568,6 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// CRUD operators follow
bool PreVisit(CreateNode &op) override {
// TODO: Creation needs to be modified if running on master, so as to
// distribute node creation to workers.
prev_ops_.push_back(&op);
return true;
}
@ -578,6 +576,11 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
if (!left_cartesians_.empty()) {
Split(op, PlanCartesian(op.input()));
}
// Creation needs to be modified if running on master, so as to distribute
// node creation to workers.
if (!ShouldSplit()) {
op.set_on_random_worker(true);
}
needs_synchronize_ = true;
return true;
}

View File

@ -255,13 +255,15 @@ class CreateNode : public LogicalOperator {
auto input() const { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) { input_ = input; }
auto on_random_worker() const { return on_random_worker_; }
void set_on_random_worker(bool v) { on_random_worker_ = v; }
private:
CreateNode() {}
std::shared_ptr<LogicalOperator> input_;
NodeAtom *node_atom_ = nullptr;
bool on_random_worker_;
bool on_random_worker_ = false;
class CreateNodeCursor : public Cursor {
public:

View File

@ -151,7 +151,6 @@ class OpChecker : public BaseOpChecker {
virtual void ExpectOp(TOp &, const SymbolTable &) {}
};
using ExpectCreateNode = OpChecker<CreateNode>;
using ExpectCreateExpand = OpChecker<CreateExpand>;
using ExpectDelete = OpChecker<Delete>;
using ExpectScanAll = OpChecker<ScanAll>;
@ -433,6 +432,19 @@ class ExpectCartesian : public OpChecker<Cartesian> {
const std::list<std::unique_ptr<BaseOpChecker>> &right_;
};
class ExpectCreateNode : public OpChecker<CreateNode> {
public:
ExpectCreateNode(bool on_random_worker = false)
: on_random_worker_(on_random_worker) {}
void ExpectOp(CreateNode &op, const SymbolTable &) override {
EXPECT_EQ(op.on_random_worker(), on_random_worker_);
}
private:
bool on_random_worker_ = false;
};
auto MakeSymbolTable(query::Query &query) {
SymbolTable symbol_table;
SymbolGenerator symbol_generator(symbol_table);
@ -644,7 +656,7 @@ TYPED_TEST(TestPlanner, CreateNodeReturn) {
ExpectProduce());
{
auto expected = ExpectDistributed(MakeCheckers(
ExpectCreateNode(), ExpectSynchronize(false), ExpectProduce()));
ExpectCreateNode(true), ExpectSynchronize(false), ExpectProduce()));
std::atomic<int64_t> next_plan_id{0};
auto distributed_plan =
MakeDistributedPlan(planner.plan(), symbol_table, next_plan_id);
@ -662,7 +674,7 @@ TYPED_TEST(TestPlanner, CreateExpand) {
NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(),
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
@ -674,7 +686,7 @@ TYPED_TEST(TestPlanner, CreateMultipleNode) {
QUERY(SINGLE_QUERY(CREATE(PATTERN(NODE("n")), PATTERN(NODE("m")))));
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateNode());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateNode(),
MakeCheckers(ExpectCreateNode(true), ExpectCreateNode(true),
ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
@ -692,8 +704,8 @@ TYPED_TEST(TestPlanner, CreateNodeExpandNode) {
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectCreateNode());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(), ExpectCreateNode(),
ExpectSynchronize(false)),
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectCreateNode(true), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
}
@ -709,7 +721,7 @@ TYPED_TEST(TestPlanner, CreateNamedPattern) {
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectConstructNamedPath());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(),
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectConstructNamedPath(), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
@ -1082,7 +1094,7 @@ TYPED_TEST(TestPlanner, CreateMultiExpand) {
CheckPlan<TypeParam>(storage, ExpectCreateNode(), ExpectCreateExpand(),
ExpectCreateExpand());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectCreateExpand(),
MakeCheckers(ExpectCreateNode(true), ExpectCreateExpand(),
ExpectCreateExpand(), ExpectSynchronize(false)),
{}};
CheckDistributedPlan<TypeParam>(storage, expected);
@ -1214,8 +1226,9 @@ TYPED_TEST(TestPlanner, CreateWithSkipReturnLimit) {
CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc,
ExpectProduce(), ExpectSkip(), ExpectProduce(), ExpectLimit());
ExpectedDistributedPlan expected{
MakeCheckers(ExpectCreateNode(), ExpectSynchronize(true), ExpectProduce(),
ExpectSkip(), ExpectProduce(), ExpectLimit()),
MakeCheckers(ExpectCreateNode(true), ExpectSynchronize(true),
ExpectProduce(), ExpectSkip(), ExpectProduce(),
ExpectLimit()),
{}};
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}