Extract distributed Expand

Summary: Depends on D1575

Reviewers: mtomic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1576
This commit is contained in:
Teon Banek 2018-08-30 13:42:10 +02:00
parent 80f649bcd1
commit c4958d9960
8 changed files with 386 additions and 160 deletions

View File

@ -266,11 +266,11 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor {
return true;
}
bool PreVisit(Expand &exp) override {
bool PreVisit(DistributedExpand &exp) override {
prev_ops_.push_back(&exp);
return true;
}
bool PostVisit(Expand &exp) override {
bool PostVisit(DistributedExpand &exp) override {
prev_ops_.pop_back();
if (branch_.subtree) return true;
if (auto found = FindForbidden(exp.input_symbol())) {
@ -968,11 +968,20 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
}
// Expand is done locally on each machine with RPC calls for worker-boundary
// crossing edges.
// crossing edges. We replace it with DistributedExpand which does RPC calls
// asynchronously.
bool PreVisit(Expand &exp) override {
prev_ops_.push_back(&exp);
return true;
}
bool PostVisit(Expand &exp) override {
prev_ops_.pop_back();
auto distributed_expand = std::make_unique<DistributedExpand>(
exp.node_symbol(), exp.edge_symbol(), exp.direction(), exp.edge_types(),
exp.input(), exp.input_symbol(), exp.existing_node(), exp.graph_view());
SetOnPrevious(std::move(distributed_expand));
return true;
}
bool PreVisit(ExpandVariable &exp) override {
prev_ops_.push_back(&exp);
@ -1048,6 +1057,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
optional_planner.lhs_optional_symbols_ =
op.input()->ModifiedSymbols(optional_plan.symbol_table);
optional_plan.master_plan->Accept(optional_planner);
CHECK(dynamic_cast<Produce *>(optional_plan.master_plan.get()));
// Revert storage and symbol table
distributed_plan_.ast_storage = std::move(optional_plan.ast_storage);
distributed_plan_.symbol_table = std::move(optional_plan.symbol_table);
@ -1057,6 +1067,9 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
// Case 1)
// Optional subtree doesn't create any worker plans (i.e. has no ScanAll),
// we continue as normal.
SetOnPrevious(std::make_unique<Optional>(
op.input(), optional_plan.master_plan->input(),
op.optional_symbols()));
return true;
}
// Case 2)
@ -1076,7 +1089,6 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
for (const auto &plan : optional_plan.worker_plans) {
distributed_plan_.worker_plans.emplace_back(plan);
}
CHECK(dynamic_cast<Produce *>(optional_plan.master_plan.get()));
if (optional_planner.on_master_) {
// This means that optional planned a Cartesian and the dependencies
// on LHS symbols should have been taken care of.

View File

@ -108,6 +108,16 @@ std::vector<Symbol> PullRemoteOrderBy::ModifiedSymbols(
return input_->ModifiedSymbols(table);
}
ACCEPT_WITH_INPUT(DistributedExpand);
std::vector<Symbol> DistributedExpand::ModifiedSymbols(
const SymbolTable &table) const {
auto symbols = input_->ModifiedSymbols(table);
symbols.emplace_back(node_symbol());
symbols.emplace_back(edge_symbol());
return symbols;
}
DistributedExpandBfs::DistributedExpandBfs(
Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
@ -165,6 +175,7 @@ std::vector<Symbol> DistributedCreateExpand::ModifiedSymbols(
symbols.emplace_back(table.at(*edge_atom_->identifier_));
return symbols;
}
//////////////////////////////////////////////////////////////////////
// Cursors
//////////////////////////////////////////////////////////////////////
@ -749,6 +760,220 @@ class PullRemoteOrderByCursor : public Cursor {
bool merge_initialized_ = false;
};
class DistributedExpandCursor : public query::plan::Cursor {
public:
DistributedExpandCursor(const DistributedExpand *self,
database::GraphDbAccessor *db)
: input_cursor_(self->input()->MakeCursor(*db)), self_(self) {}
bool Pull(Frame &frame, Context &context) {
// A helper function for expanding a node from an edge.
auto pull_node = [this, &frame](const EdgeAccessor &new_edge,
EdgeAtom::Direction direction) {
if (self_->existing_node()) return;
switch (direction) {
case EdgeAtom::Direction::IN:
frame[self_->node_symbol()] = new_edge.from();
break;
case EdgeAtom::Direction::OUT:
frame[self_->node_symbol()] = new_edge.to();
break;
case EdgeAtom::Direction::BOTH:
LOG(FATAL) << "Must indicate exact expansion direction here";
}
};
auto push_future_edge = [this, &frame](auto edge, auto direction) {
auto edge_to = std::async(std::launch::async, [edge, direction]() {
if (direction == EdgeAtom::Direction::IN)
return std::make_pair(edge, edge.from());
if (direction == EdgeAtom::Direction::OUT)
return std::make_pair(edge, edge.to());
LOG(FATAL) << "Must indicate exact expansion direction here";
});
future_expands_.emplace_back(
FutureExpand{utils::make_future(std::move(edge_to)), frame.elems()});
};
auto find_ready_future = [this]() {
return std::find_if(
future_expands_.begin(), future_expands_.end(),
[](const auto &future) { return future.edge_to.IsReady(); });
};
auto put_future_edge_on_frame = [this, &frame](auto &future) {
auto edge_to = future.edge_to.get();
frame.elems() = future.frame_elems;
frame[self_->edge_symbol()] = edge_to.first;
frame[self_->node_symbol()] = edge_to.second;
};
while (true) {
if (context.db_accessor_.should_abort()) throw HintedAbortError();
// Try to get any remote edges we may have available first. If we yielded
// all of the local edges first, we may accumulate large amounts of future
// edges.
{
auto future_it = find_ready_future();
if (future_it != future_expands_.end()) {
// Backup the current frame (if we haven't done so already) before
// putting the future edge.
if (last_frame_.empty()) last_frame_ = frame.elems();
put_future_edge_on_frame(*future_it);
// Erase the future and return true to yield the result.
future_expands_.erase(future_it);
return true;
}
}
// In case we have replaced the frame with the one for a future edge,
// restore it.
if (!last_frame_.empty()) {
frame.elems() = last_frame_;
last_frame_.clear();
}
// attempt to get a value from the incoming edges
if (in_edges_ && *in_edges_it_ != in_edges_->end()) {
auto edge = *(*in_edges_it_)++;
if (edge.address().is_local() || self_->existing_node()) {
frame[self_->edge_symbol()] = edge;
pull_node(edge, EdgeAtom::Direction::IN);
return true;
} else {
push_future_edge(edge, EdgeAtom::Direction::IN);
continue;
}
}
// attempt to get a value from the outgoing edges
if (out_edges_ && *out_edges_it_ != out_edges_->end()) {
auto edge = *(*out_edges_it_)++;
// when expanding in EdgeAtom::Direction::BOTH directions
// we should do only one expansion for cycles, and it was
// already done in the block above
if (self_->direction() == EdgeAtom::Direction::BOTH && edge.is_cycle())
continue;
if (edge.address().is_local() || self_->existing_node()) {
frame[self_->edge_symbol()] = edge;
pull_node(edge, EdgeAtom::Direction::OUT);
return true;
} else {
push_future_edge(edge, EdgeAtom::Direction::OUT);
continue;
}
}
// if we are here, either the edges have not been initialized,
// or they have been exhausted. attempt to initialize the edges,
// if the input is exhausted
if (!InitEdges(frame, context)) {
// We are done with local and remote edges so return false.
if (future_expands_.empty()) return false;
// We still need to yield remote edges.
auto future_it = find_ready_future();
if (future_it != future_expands_.end()) {
put_future_edge_on_frame(*future_it);
// Erase the future and return true to yield the result.
future_expands_.erase(future_it);
return true;
}
// We are still waiting for future edges, so sleep and fallthrough to
// continue the loop.
std::this_thread::sleep_for(
std::chrono::microseconds(FLAGS_remote_pull_sleep_micros));
}
// we have re-initialized the edges, continue with the loop
}
}
void Reset() {
input_cursor_->Reset();
in_edges_ = std::experimental::nullopt;
in_edges_it_ = std::experimental::nullopt;
out_edges_ = std::experimental::nullopt;
out_edges_it_ = std::experimental::nullopt;
future_expands_.clear();
last_frame_.clear();
}
bool InitEdges(Frame &frame, Context &context) {
// Input Vertex could be null if it is created by a failed optional match.
// In those cases we skip that input pull and continue with the next.
while (true) {
if (!input_cursor_->Pull(frame, context)) return false;
TypedValue &vertex_value = frame[self_->input_symbol()];
// Null check due to possible failed optional match.
if (vertex_value.IsNull()) continue;
ExpectType(self_->input_symbol(), vertex_value, TypedValue::Type::Vertex);
auto &vertex = vertex_value.Value<VertexAccessor>();
SwitchAccessor(vertex, self_->graph_view());
auto direction = self_->direction();
if (direction == EdgeAtom::Direction::IN ||
direction == EdgeAtom::Direction::BOTH) {
if (self_->existing_node()) {
TypedValue &existing_node = frame[self_->node_symbol()];
// old_node_value may be Null when using optional matching
if (!existing_node.IsNull()) {
ExpectType(self_->node_symbol(), existing_node,
TypedValue::Type::Vertex);
in_edges_.emplace(
vertex.in(existing_node.ValueVertex(), &self_->edge_types()));
}
} else {
in_edges_.emplace(vertex.in(&self_->edge_types()));
}
in_edges_it_.emplace(in_edges_->begin());
}
if (direction == EdgeAtom::Direction::OUT ||
direction == EdgeAtom::Direction::BOTH) {
if (self_->existing_node()) {
TypedValue &existing_node = frame[self_->node_symbol()];
// old_node_value may be Null when using optional matching
if (!existing_node.IsNull()) {
ExpectType(self_->node_symbol(), existing_node,
TypedValue::Type::Vertex);
out_edges_.emplace(
vertex.out(existing_node.ValueVertex(), &self_->edge_types()));
}
} else {
out_edges_.emplace(vertex.out(&self_->edge_types()));
}
out_edges_it_.emplace(out_edges_->begin());
}
return true;
}
}
private:
struct FutureExpand {
utils::Future<std::pair<EdgeAccessor, VertexAccessor>> edge_to;
std::vector<TypedValue> frame_elems;
};
std::unique_ptr<query::plan::Cursor> input_cursor_;
const DistributedExpand *self_{nullptr};
// The iterable over edges and the current edge iterator are referenced via
// optional because they can not be initialized in the constructor of
// this class. They are initialized once for each pull from the input.
std::experimental::optional<DistributedExpand::InEdgeT> in_edges_;
std::experimental::optional<DistributedExpand::InEdgeIteratorT> in_edges_it_;
std::experimental::optional<DistributedExpand::OutEdgeT> out_edges_;
std::experimental::optional<DistributedExpand::OutEdgeIteratorT>
out_edges_it_;
// Stores the last frame before we yield the frame for future edge. It needs
// to be restored afterward.
std::vector<TypedValue> last_frame_;
// Edges which are being asynchronously fetched from a remote worker.
// NOTE: This should be destructed first to ensure that no invalid
// references or pointers exists to other objects of this class.
std::vector<FutureExpand> future_expands_;
};
class DistributedExpandBfsCursor : public query::plan::Cursor {
public:
DistributedExpandBfsCursor(const DistributedExpandBfs &self,
@ -1104,6 +1329,11 @@ std::unique_ptr<Cursor> PullRemoteOrderBy::MakeCursor(
return std::make_unique<PullRemoteOrderByCursor>(*this, db);
}
std::unique_ptr<Cursor> DistributedExpand::MakeCursor(
database::GraphDbAccessor &db) const {
return std::make_unique<DistributedExpandCursor>(this, &db);
}
std::unique_ptr<Cursor> DistributedExpandBfs::MakeCursor(
database::GraphDbAccessor &db) const {
return std::make_unique<DistributedExpandBfsCursor>(*this, db);

View File

@ -15,14 +15,15 @@ cpp<#
class PullRemote;
class Synchronize;
class PullRemoteOrderBy;
class DistributedExpand;
class DistributedExpandBfs;
class DistributedCreateNode;
class DistributedCreateExpand;
using DistributedOperatorCompositeVisitor =
::utils::CompositeVisitor<PullRemote, Synchronize, PullRemoteOrderBy,
DistributedExpandBfs, DistributedCreateNode,
DistributedCreateExpand>;
DistributedExpand, DistributedExpandBfs,
DistributedCreateNode, DistributedCreateExpand>;
/// Base class for visiting regular and distributed LogicalOperator instances.
class DistributedOperatorVisitor : public HierarchicalLogicalOperatorVisitor,
@ -174,6 +175,26 @@ by having only one result from each worker.")
(:private #>cpp PullRemoteOrderBy() {} cpp<#)
(:serialize :capnp))
(lcp:define-class distributed-expand (logical-operator expand-common)
()
(:documentation "Distributed version of Expand operator")
(:public
#>cpp
using ExpandCommon::ExpandCommon;
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
bool HasSingleInput() const override { return true; }
std::shared_ptr<LogicalOperator> input() const override { return input_; }
void set_input(std::shared_ptr<LogicalOperator> input) override {
input_ = input;
}
cpp<#)
(:serialize :capnp :inherit-compose '(expand-common)))
(lcp:define-class distributed-expand-bfs (logical-operator expand-common)
((lower-bound "Expression *" :reader t
:documentation "Optional lower bound, default is 1"

View File

@ -33,9 +33,6 @@
#include "utils/string.hpp"
#include "utils/thread/sync.hpp"
// TODO: Remove this when distributed logic is completely removed from here.
DECLARE_int32(remote_pull_sleep_micros);
// macro for the default implementation of LogicalOperator::Accept
// that accepts the visitor and visits it's input_ operator
#define ACCEPT_WITH_INPUT(class_name) \
@ -468,104 +465,36 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) {
}
};
auto push_future_edge = [this, &frame](auto edge, auto direction) {
auto edge_to = std::async(std::launch::async, [edge, direction]() {
if (direction == EdgeAtom::Direction::IN)
return std::make_pair(edge, edge.from());
if (direction == EdgeAtom::Direction::OUT)
return std::make_pair(edge, edge.to());
LOG(FATAL) << "Must indicate exact expansion direction here";
});
future_expands_.emplace_back(
FutureExpand{utils::make_future(std::move(edge_to)), frame.elems()});
};
auto find_ready_future = [this]() {
return std::find_if(
future_expands_.begin(), future_expands_.end(),
[](const auto &future) { return future.edge_to.IsReady(); });
};
auto put_future_edge_on_frame = [this, &frame](auto &future) {
auto edge_to = future.edge_to.get();
frame.elems() = future.frame_elems;
frame[self_.edge_symbol_] = edge_to.first;
frame[self_.node_symbol_] = edge_to.second;
};
while (true) {
if (db_.should_abort()) throw HintedAbortError();
// Try to get any remote edges we may have available first. If we yielded
// all of the local edges first, we may accumulate large amounts of future
// edges.
{
auto future_it = find_ready_future();
if (future_it != future_expands_.end()) {
// Backup the current frame (if we haven't done so already) before
// putting the future edge.
if (last_frame_.empty()) last_frame_ = frame.elems();
put_future_edge_on_frame(*future_it);
// Erase the future and return true to yield the result.
future_expands_.erase(future_it);
return true;
}
}
// In case we have replaced the frame with the one for a future edge,
// restore it.
if (!last_frame_.empty()) {
frame.elems() = last_frame_;
last_frame_.clear();
}
// attempt to get a value from the incoming edges
if (in_edges_ && *in_edges_it_ != in_edges_->end()) {
auto edge = *(*in_edges_it_)++;
if (edge.address().is_local() || self_.existing_node_) {
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::IN);
return true;
} else {
push_future_edge(edge, EdgeAtom::Direction::IN);
continue;
}
CHECK(edge.address().is_local() && edge.from().address().is_local())
<< "Expected Expand only in single node execution";
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::IN);
return true;
}
// attempt to get a value from the outgoing edges
if (out_edges_ && *out_edges_it_ != out_edges_->end()) {
auto edge = *(*out_edges_it_)++;
CHECK(edge.address().is_local() && edge.to().address().is_local())
<< "Expected Expand only in single node execution";
// when expanding in EdgeAtom::Direction::BOTH directions
// we should do only one expansion for cycles, and it was
// already done in the block above
if (self_.direction_ == EdgeAtom::Direction::BOTH && edge.is_cycle())
continue;
if (edge.address().is_local() || self_.existing_node_) {
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::OUT);
return true;
} else {
push_future_edge(edge, EdgeAtom::Direction::OUT);
continue;
}
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::OUT);
return true;
}
// if we are here, either the edges have not been initialized,
// or they have been exhausted. attempt to initialize the edges,
// if the input is exhausted
if (!InitEdges(frame, context)) {
// We are done with local and remote edges so return false.
if (future_expands_.empty()) return false;
// We still need to yield remote edges.
auto future_it = find_ready_future();
if (future_it != future_expands_.end()) {
put_future_edge_on_frame(*future_it);
// Erase the future and return true to yield the result.
future_expands_.erase(future_it);
return true;
}
// We are still waiting for future edges, so sleep and fallthrough to
// continue the loop.
std::this_thread::sleep_for(
std::chrono::microseconds(FLAGS_remote_pull_sleep_micros));
}
// If we are here, either the edges have not been initialized,
// or they have been exhausted. Attempt to initialize the edges.
if (!InitEdges(frame, context)) return false;
// we have re-initialized the edges, continue with the loop
}
@ -577,8 +506,6 @@ void Expand::ExpandCursor::Reset() {
in_edges_it_ = std::experimental::nullopt;
out_edges_ = std::experimental::nullopt;
out_edges_it_ = std::experimental::nullopt;
future_expands_.clear();
last_frame_.clear();
}
bool Expand::ExpandCursor::InitEdges(Frame &frame, Context &context) {

View File

@ -725,17 +725,17 @@ expansion")
const std::shared_ptr<LogicalOperator> &input,
Symbol input_symbol, bool existing_node,
GraphView graph_view);
cpp<#)
(:protected
#>cpp
virtual ~ExpandCommon() {}
// types that we'll use for members in both subclasses
// types that we'll use for members in subclasses
using InEdgeT = decltype(std::declval<VertexAccessor>().in());
using InEdgeIteratorT = decltype(std::declval<VertexAccessor>().in().begin());
using OutEdgeT = decltype(std::declval<VertexAccessor>().out());
using OutEdgeIteratorT =
decltype(std::declval<VertexAccessor>().out().begin());
cpp<#)
(:protected
#>cpp
virtual ~ExpandCommon() {}
/**
* For a newly expanded node handles existence checking and
@ -794,11 +794,6 @@ pulled.")
void Reset() override;
private:
struct FutureExpand {
utils::Future<std::pair<EdgeAccessor, VertexAccessor>> edge_to;
std::vector<TypedValue> frame_elems;
};
const Expand &self_;
const std::unique_ptr<Cursor> input_cursor_;
database::GraphDbAccessor &db_;
@ -810,13 +805,6 @@ pulled.")
std::experimental::optional<InEdgeIteratorT> in_edges_it_;
std::experimental::optional<OutEdgeT> out_edges_;
std::experimental::optional<OutEdgeIteratorT> out_edges_it_;
// Stores the last frame before we yield the frame for future edge. It needs
// to be restored afterward.
std::vector<TypedValue> last_frame_;
// Edges which are being asynchronously fetched from a remote worker.
// NOTE: This should be destructed first to ensure that no invalid
// references or pointers exists to other objects of this class.
std::vector<FutureExpand> future_expands_;
bool InitEdges(Frame &, Context &);
};

View File

@ -80,6 +80,14 @@ class PlanPrinter final : public DistributedOperatorVisitor {
return true;
}
bool PreVisit(query::plan::DistributedExpand &op) override {
WithPrintLn([&](auto &out) {
out << "* DistributedExpand";
PrintExpand(out, op);
});
return true;
}
bool PreVisit(query::plan::DistributedExpandBfs &op) override {
WithPrintLn([&](auto &out) {
out << "* DistributedExpandBfs";

View File

@ -33,6 +33,28 @@ using namespace distributed;
using namespace database;
using namespace std::literals::chrono_literals;
ExpandTuple MakeDistributedExpand(
AstStorage &storage, SymbolTable &symbol_table,
std::shared_ptr<LogicalOperator> input, Symbol input_symbol,
const std::string &edge_identifier, EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types,
const std::string &node_identifier, bool existing_node,
GraphView graph_view) {
auto edge = EDGE(edge_identifier, direction);
auto edge_sym = symbol_table.CreateSymbol(edge_identifier, true);
symbol_table[*edge->identifier_] = edge_sym;
auto node = NODE(node_identifier);
auto node_sym = symbol_table.CreateSymbol(node_identifier, true);
symbol_table[*node->identifier_] = node_sym;
auto op = std::make_shared<DistributedExpand>(node_sym, edge_sym, direction,
edge_types, input, input_symbol,
existing_node, graph_view);
return ExpandTuple{edge, edge_sym, node, node_sym, op};
}
class DistributedQueryPlan : public DistributedGraphDbTest {
protected:
DistributedQueryPlan() : DistributedGraphDbTest("query_plan") {}
@ -135,9 +157,9 @@ TEST_F(DistributedQueryPlan, PullProduceRpcWithGraphElements) {
// Use this query to test graph elements are transferred correctly in
// collections too.
auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
auto r_m =
MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
EdgeAtom::Direction::OUT, {}, "m", false, GraphView::OLD);
auto r_m = MakeDistributedExpand(storage, ctx.symbol_table_, n.op_, n.sym_,
"r", EdgeAtom::Direction::OUT, {}, "m",
false, GraphView::OLD);
auto p_sym = ctx.symbol_table_.CreateSymbol("p", true);
auto p = std::make_shared<query::plan::ConstructNamedPath>(
r_m.op_, p_sym,
@ -212,9 +234,9 @@ TEST_F(DistributedQueryPlan, Synchronize) {
AstStorage storage;
// MATCH
auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
auto r_m =
MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
EdgeAtom::Direction::BOTH, {}, "m", false, GraphView::OLD);
auto r_m = MakeDistributedExpand(storage, ctx.symbol_table_, n.op_, n.sym_,
"r", EdgeAtom::Direction::BOTH, {}, "m",
false, GraphView::OLD);
// SET
auto literal = LITERAL(42);

View File

@ -131,6 +131,7 @@ class PlanChecker : public DistributedOperatorVisitor {
}
PRE_VISIT(PullRemoteOrderBy);
PRE_VISIT(DistributedExpand);
PRE_VISIT(DistributedExpandBfs);
PRE_VISIT(DistributedCreateNode);
PRE_VISIT(DistributedCreateExpand);
@ -196,6 +197,7 @@ using ExpectOrderBy = OpChecker<OrderBy>;
using ExpectUnwind = OpChecker<Unwind>;
using ExpectDistinct = OpChecker<Distinct>;
using ExpectShowStreams = OpChecker<ShowStreams>;
using ExpectDistributedExpand = OpChecker<DistributedExpand>;
using ExpectDistributedExpandBfs = OpChecker<DistributedExpandBfs>;
using ExpectDistributedCreateExpand = OpChecker<DistributedCreateExpand>;
@ -1066,9 +1068,11 @@ TYPED_TEST(TestPlanner, MatchPathReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce()));
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1089,10 +1093,10 @@ TYPED_TEST(TestPlanner, MatchNamedPatternReturn) {
ExpectConstructNamedPath(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_p)});
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectProduce()));
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectConstructNamedPath(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectConstructNamedPath(), ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1112,11 +1116,13 @@ TYPED_TEST(TestPlanner, MatchNamedPatternWithPredicateReturn) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectConstructNamedPath(), ExpectFilter(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_p)});
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectFilter(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(),
ExpectFilter(), ExpectProduce()));
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectConstructNamedPath(), ExpectFilter(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectConstructNamedPath(), ExpectFilter(),
ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1137,16 +1143,23 @@ TYPED_TEST(TestPlanner, OptionalMatchNamedPatternReturn) {
get_symbol(edge), get_symbol(node_m)};
FakeDbAccessor dba;
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
std::list<BaseOpChecker *> optional{new ExpectScanAll(), new ExpectExpand(),
new ExpectConstructNamedPath()};
CheckPlan(planner.plan(), symbol_table,
ExpectOptional(optional_symbols, optional), ExpectProduce());
optional.push_back(new ExpectPullRemote(optional_symbols));
auto expected = ExpectDistributed(
MakeCheckers(ExpectOptional(optional_symbols, optional), ExpectProduce()),
MakeCheckers(ExpectScanAll(), ExpectExpand(),
ExpectConstructNamedPath()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
{
std::list<BaseOpChecker *> optional{new ExpectScanAll(), new ExpectExpand(),
new ExpectConstructNamedPath()};
CheckPlan(planner.plan(), symbol_table,
ExpectOptional(optional_symbols, optional), ExpectProduce());
}
{
std::list<BaseOpChecker *> optional{
new ExpectScanAll(), new ExpectDistributedExpand(),
new ExpectConstructNamedPath(), new ExpectPullRemote(optional_symbols)};
auto expected = ExpectDistributed(
MakeCheckers(ExpectOptional(optional_symbols, optional),
ExpectProduce()),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectConstructNamedPath()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
}
TYPED_TEST(TestPlanner, MatchWhereReturn) {
@ -1283,17 +1296,19 @@ TYPED_TEST(TestPlanner, MultiMatch) {
};
ExpectPullRemote left_pull(
{get_symbol(node_n), get_symbol(edge_r), get_symbol(node_m)});
auto left_cart = MakeCheckers(ExpectScanAll(), ExpectExpand(), left_pull);
auto left_cart =
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), left_pull);
ExpectPullRemote right_pull({get_symbol(node_j), get_symbol(edge_e),
get_symbol(node_i), get_symbol(edge_f),
get_symbol(node_h)});
auto right_cart =
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectExpand(),
ExpectExpandUniquenessFilter<EdgeAccessor>(), right_pull);
auto right_cart = MakeCheckers(
ExpectScanAll(), ExpectDistributedExpand(), ExpectDistributedExpand(),
ExpectExpandUniquenessFilter<EdgeAccessor>(), right_pull);
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectProduce()),
MakeCheckers(ExpectScanAll(), ExpectExpand()),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectExpand(),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand()),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectDistributedExpand(),
ExpectExpandUniquenessFilter<EdgeAccessor>()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1313,9 +1328,11 @@ TYPED_TEST(TestPlanner, MultiMatchSameStart) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(),
ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectProduce()));
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -1732,11 +1749,11 @@ TYPED_TEST(TestPlanner, MatchWhereBeforeExpand) {
CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectFilter(),
ExpectExpand(), ExpectProduce());
ExpectPullRemote pull({symbol_table.at(*as_n)});
auto expected =
ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectFilter(),
ExpectExpand(), ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectFilter(),
ExpectExpand(), ExpectProduce()));
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectDistributedExpand(),
ExpectProduce(), pull),
MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectDistributedExpand(),
ExpectProduce()));
CheckDistributedPlan(planner.plan(), symbol_table, expected);
}
@ -2664,12 +2681,12 @@ TYPED_TEST(TestPlanner, DistributedCartesianExpand) {
auto sym_b = symbol_table.at(*node_b->identifier_);
auto sym_e = symbol_table.at(*edge_e->identifier_);
auto sym_c = symbol_table.at(*node_c->identifier_);
auto right_cart = MakeCheckers(ExpectScanAll(), ExpectExpand(),
auto right_cart = MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(),
ExpectPullRemote({sym_b, sym_e, sym_c}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectProduce()),
MakeCheckers(ExpectScanAll()),
MakeCheckers(ExpectScanAll(), ExpectExpand()));
MakeCheckers(ExpectScanAll(), ExpectDistributedExpand()));
FakeDbAccessor dba;
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
CheckDistributedPlan(planner.plan(), symbol_table, expected);
@ -2689,8 +2706,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianExpandToExisting) {
auto sym_b = symbol_table.at(*node_b->identifier_);
auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectExpand(),
ExpectProduce()),
MakeCheckers(ExpectCartesian(left_cart, right_cart),
ExpectDistributedExpand(), ExpectProduce()),
MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
FakeDbAccessor dba;
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
@ -2711,8 +2728,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianExpandFromExisting) {
auto sym_b = symbol_table.at(*node_b->identifier_);
auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
auto expected = ExpectDistributed(
MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectExpand(),
ExpectProduce()),
MakeCheckers(ExpectCartesian(left_cart, right_cart),
ExpectDistributedExpand(), ExpectProduce()),
MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
FakeDbAccessor dba;
auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
@ -3040,7 +3057,7 @@ TYPED_TEST(TestPlanner, DistributedOptionalExpand) {
OPTIONAL_MATCH(PATTERN(node_n, edge_e, node_m)), ret_e));
auto symbol_table = MakeSymbolTable(*storage.query());
auto sym_e = symbol_table.at(*ret_e->body_.named_expressions[0]);
std::list<BaseOpChecker *> optional{new ExpectExpand()};
std::list<BaseOpChecker *> optional{new ExpectDistributedExpand()};
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectOptional(optional), ExpectProduce(),
ExpectPullRemote({sym_e})),
@ -3088,8 +3105,9 @@ TYPED_TEST(TestPlanner, DistributedOptionalScanExpandExisting) {
auto symbol_table = MakeSymbolTable(*storage.query());
auto sym_a = symbol_table.at(*node_a->identifier_);
auto sym_b = symbol_table.at(*node_b->identifier_);
std::list<BaseOpChecker *> optional{
new ExpectScanAll(), new ExpectPullRemote({sym_b}), new ExpectExpand()};
std::list<BaseOpChecker *> optional{new ExpectScanAll(),
new ExpectPullRemote({sym_b}),
new ExpectDistributedExpand()};
auto expected = ExpectDistributed(
MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}),
ExpectOptional(optional), ExpectProduce()),