Query::Plan::Optional added, MergeCursor slight mod
Reviewers: teon.banek, buda, mislav.bradac Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D321
This commit is contained in:
parent
8b8aee9b38
commit
2710485742
@ -1424,40 +1424,37 @@ Merge::MergeCursor::MergeCursor(Merge &self, GraphDbAccessor &db)
|
||||
merge_create_cursor_(self.merge_create_->MakeCursor(db)) {}
|
||||
|
||||
bool Merge::MergeCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
|
||||
// the loop is here to go back to input pull
|
||||
// when the merge_match gets exhausted
|
||||
while (true) {
|
||||
if (pull_input_) {
|
||||
if (input_cursor_->Pull(frame, symbol_table)) {
|
||||
// after a successful input from the input
|
||||
// reset merge_match (it's expand iterators maintain state)
|
||||
// and merge_create (could have a Once at the beginning)
|
||||
merge_match_cursor_->Reset();
|
||||
merge_create_cursor_->Reset();
|
||||
} else
|
||||
// input is exhausted, we're done
|
||||
return false;
|
||||
}
|
||||
if (pull_input_) {
|
||||
if (input_cursor_->Pull(frame, symbol_table)) {
|
||||
// after a successful input from the input
|
||||
// reset merge_match (it's expand iterators maintain state)
|
||||
// and merge_create (could have a Once at the beginning)
|
||||
merge_match_cursor_->Reset();
|
||||
merge_create_cursor_->Reset();
|
||||
} else
|
||||
// input is exhausted, we're done
|
||||
return false;
|
||||
}
|
||||
|
||||
// pull from the merge_match cursor
|
||||
if (merge_match_cursor_->Pull(frame, symbol_table)) {
|
||||
// if successful, next Pull from this should not pull_input_
|
||||
pull_input_ = false;
|
||||
// pull from the merge_match cursor
|
||||
if (merge_match_cursor_->Pull(frame, symbol_table)) {
|
||||
// if successful, next Pull from this should not pull_input_
|
||||
pull_input_ = false;
|
||||
return true;
|
||||
} else {
|
||||
// failed to Pull from the merge_match cursor
|
||||
if (pull_input_) {
|
||||
// if we have just now pulled from the input
|
||||
// and failed to pull from merge_match, we should create
|
||||
bool merge_create_pull_result =
|
||||
merge_create_cursor_->Pull(frame, symbol_table);
|
||||
debug_assert(merge_create_pull_result, "MergeCreate must never fail");
|
||||
return true;
|
||||
} else {
|
||||
// failed to Pull from the merge_match cursor
|
||||
if (pull_input_) {
|
||||
// if we have just now pulled from the input
|
||||
// and failed to pull from merge_match, we should create
|
||||
bool merge_create_pull_result =
|
||||
merge_create_cursor_->Pull(frame, symbol_table);
|
||||
debug_assert(merge_create_pull_result, "MergeCreate must never fail");
|
||||
return true;
|
||||
}
|
||||
// we have exhausted merge_match
|
||||
// so we should pull from input on next pull
|
||||
pull_input_ = true;
|
||||
}
|
||||
// we have exhausted merge_match_cursor_ after 1 or more successful Pulls
|
||||
// attempt next input_cursor_ pull
|
||||
pull_input_ = true;
|
||||
return Pull(frame, symbol_table);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1465,6 +1462,74 @@ void Merge::MergeCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
merge_match_cursor_->Reset();
|
||||
merge_create_cursor_->Reset();
|
||||
pull_input_ = true;
|
||||
}
|
||||
|
||||
Optional::Optional(const std::shared_ptr<LogicalOperator> &input,
|
||||
const std::shared_ptr<LogicalOperator> &optional,
|
||||
const std::vector<Symbol> &optional_symbols)
|
||||
: input_(input ? input : std::make_shared<Once>()),
|
||||
optional_(optional),
|
||||
optional_symbols_(optional_symbols) {}
|
||||
|
||||
void Optional::Accept(LogicalOperatorVisitor &visitor) {
|
||||
if (visitor.PreVisit(*this)) {
|
||||
visitor.Visit(*this);
|
||||
input_->Accept(visitor);
|
||||
optional_->Accept(visitor);
|
||||
visitor.PostVisit(*this);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<Cursor> Optional::MakeCursor(GraphDbAccessor &db) {
|
||||
return std::make_unique<OptionalCursor>(*this, db);
|
||||
}
|
||||
|
||||
Optional::OptionalCursor::OptionalCursor(Optional &self, GraphDbAccessor &db)
|
||||
: self_(self),
|
||||
input_cursor_(self.input_->MakeCursor(db)),
|
||||
optional_cursor_(self.optional_->MakeCursor(db)) {}
|
||||
|
||||
bool Optional::OptionalCursor::Pull(Frame &frame,
|
||||
const SymbolTable &symbol_table) {
|
||||
if (pull_input_) {
|
||||
if (input_cursor_->Pull(frame, symbol_table)) {
|
||||
// after a successful input from the input
|
||||
// reset optional_ (it's expand iterators maintain state)
|
||||
optional_cursor_->Reset();
|
||||
} else
|
||||
// input is exhausted, we're done
|
||||
return false;
|
||||
}
|
||||
|
||||
// pull from the optional_ cursor
|
||||
if (optional_cursor_->Pull(frame, symbol_table)) {
|
||||
// if successful, next Pull from this should not pull_input_
|
||||
pull_input_ = false;
|
||||
return true;
|
||||
} else {
|
||||
// failed to Pull from the merge_match cursor
|
||||
if (pull_input_) {
|
||||
// if we have just now pulled from the input
|
||||
// and failed to pull from optional_ so set the
|
||||
// optional symbols to Null, ensure next time the
|
||||
// input gets pulled and return true
|
||||
for (const Symbol &sym : self_.optional_symbols_)
|
||||
frame[sym] = TypedValue::Null;
|
||||
pull_input_ = true;
|
||||
return true;
|
||||
}
|
||||
// we have exhausted optional_cursor_ after 1 or more successful Pulls
|
||||
// attempt next input_cursor_ pull
|
||||
pull_input_ = true;
|
||||
return Pull(frame, symbol_table);
|
||||
}
|
||||
}
|
||||
|
||||
void Optional::OptionalCursor::Reset() {
|
||||
input_cursor_->Reset();
|
||||
optional_cursor_->Reset();
|
||||
pull_input_ = true;
|
||||
}
|
||||
|
||||
} // namespace query::plan
|
||||
|
@ -74,6 +74,7 @@ class Skip;
|
||||
class Limit;
|
||||
class OrderBy;
|
||||
class Merge;
|
||||
class Optional;
|
||||
|
||||
/** @brief Base class for visitors of @c LogicalOperator class hierarchy. */
|
||||
using LogicalOperatorVisitor =
|
||||
@ -82,7 +83,8 @@ using LogicalOperatorVisitor =
|
||||
SetProperty, SetProperties, SetLabels, RemoveProperty,
|
||||
RemoveLabels, ExpandUniquenessFilter<VertexAccessor>,
|
||||
ExpandUniquenessFilter<EdgeAccessor>, Accumulate,
|
||||
AdvanceCommand, Aggregate, Skip, Limit, OrderBy, Merge>;
|
||||
AdvanceCommand, Aggregate, Skip, Limit, OrderBy, Merge,
|
||||
Optional>;
|
||||
|
||||
/** @brief Base class for logical operators.
|
||||
*
|
||||
@ -1213,5 +1215,45 @@ class Merge : public LogicalOperator {
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Optional operator. Used for optional match. For every
|
||||
* successful Pull from the input branch a Pull from the optional
|
||||
* branch is attempted (and Pulled from till exhausted). If zero
|
||||
* Pulls succeed from the optional branch, the Optional operator
|
||||
* sets the optional symbols to TypedValue::Null on the Frame
|
||||
* and returns true, once.
|
||||
*/
|
||||
class Optional : public LogicalOperator {
|
||||
public:
|
||||
Optional(const std::shared_ptr<LogicalOperator> &input,
|
||||
const std::shared_ptr<LogicalOperator> &optional,
|
||||
const std::vector<Symbol> &optional_symbols);
|
||||
void Accept(LogicalOperatorVisitor &visitor) override;
|
||||
std::unique_ptr<Cursor> MakeCursor(GraphDbAccessor &db) override;
|
||||
|
||||
private:
|
||||
const std::shared_ptr<LogicalOperator> input_;
|
||||
const std::shared_ptr<LogicalOperator> optional_;
|
||||
const std::vector<Symbol> optional_symbols_;
|
||||
|
||||
class OptionalCursor : public Cursor {
|
||||
public:
|
||||
OptionalCursor(Optional &self, GraphDbAccessor &db);
|
||||
bool Pull(Frame &frame, const SymbolTable &symbol_table) override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
const Optional &self_;
|
||||
const std::unique_ptr<Cursor> input_cursor_;
|
||||
const std::unique_ptr<Cursor> optional_cursor_;
|
||||
// indicates if the next Pull from this cursor should
|
||||
// perform a Pull from the input_cursor_
|
||||
// this is true when:
|
||||
// - first pulling from this Cursor
|
||||
// - previous Pull from this cursor exhausted the optional_cursor_
|
||||
bool pull_input_{true};
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace plan
|
||||
} // namespace query
|
||||
|
@ -263,6 +263,87 @@ TEST(QueryPlan, Expand) {
|
||||
EXPECT_EQ(8, test_expand(EdgeAtom::Direction::BOTH, GraphView::OLD));
|
||||
}
|
||||
|
||||
TEST(QueryPlan, ExpandOptional) {
|
||||
Dbms dbms;
|
||||
auto dba = dbms.active();
|
||||
|
||||
AstTreeStorage storage;
|
||||
SymbolTable symbol_table;
|
||||
|
||||
// graph (v2 {p: 2})<-[:T]-(v1 {p: 1})-[:T]->(v3 {p: 2})
|
||||
auto prop = dba->property("p");
|
||||
auto edge_type = dba->edge_type("T");
|
||||
auto v1 = dba->insert_vertex();
|
||||
v1.PropsSet(prop, 1);
|
||||
auto v2 = dba->insert_vertex();
|
||||
v2.PropsSet(prop, 2);
|
||||
dba->insert_edge(v1, v2, edge_type);
|
||||
auto v3 = dba->insert_vertex();
|
||||
v3.PropsSet(prop, 2);
|
||||
dba->insert_edge(v1, v3, edge_type);
|
||||
dba->advance_command();
|
||||
|
||||
// MATCH (n) OPTIONAL MATCH (n)-[r]->(m)
|
||||
auto n = MakeScanAll(storage, symbol_table, "n");
|
||||
auto r_m = MakeExpand(storage, symbol_table, nullptr, n.sym_,
|
||||
"r", EdgeAtom::Direction::RIGHT, false, "m", false);
|
||||
auto optional = std::make_shared<plan::Optional>(n.op_, r_m.op_,
|
||||
std::vector<Symbol>{r_m.edge_sym_, r_m.node_sym_});
|
||||
|
||||
// RETURN n, r, m
|
||||
auto n_ne = NEXPR("n", IDENT("n"));
|
||||
symbol_table[*n_ne->expression_] = n.sym_;
|
||||
symbol_table[*n_ne] = symbol_table.CreateSymbol("n");
|
||||
auto r_ne = NEXPR("r", IDENT("r"));
|
||||
symbol_table[*r_ne->expression_] = r_m.edge_sym_;
|
||||
symbol_table[*r_ne] = symbol_table.CreateSymbol("r");
|
||||
auto m_ne = NEXPR("m", IDENT("m"));
|
||||
symbol_table[*m_ne->expression_] = r_m.node_sym_;
|
||||
symbol_table[*m_ne] = symbol_table.CreateSymbol("m");
|
||||
auto produce = MakeProduce(optional, n_ne, r_ne, m_ne);
|
||||
|
||||
auto results = CollectProduce(produce, symbol_table, *dba).GetResults();
|
||||
ASSERT_EQ(4, results.size());
|
||||
int v1_is_n_count = 0;
|
||||
for (auto &row : results) {
|
||||
ASSERT_EQ(row[0].type(), TypedValue::Type::Vertex);
|
||||
VertexAccessor &va = row[0].Value<VertexAccessor>();
|
||||
auto va_p = va.PropsAt(prop);
|
||||
ASSERT_EQ(va_p.type(), PropertyValue::Type::Int);
|
||||
if (va_p.Value<int64_t>() == 1) {
|
||||
v1_is_n_count++;
|
||||
EXPECT_EQ(row[1].type(), TypedValue::Type::Edge);
|
||||
EXPECT_EQ(row[2].type(), TypedValue::Type::Vertex);
|
||||
} else {
|
||||
EXPECT_EQ(row[1].type(), TypedValue::Type::Null);
|
||||
EXPECT_EQ(row[2].type(), TypedValue::Type::Null);
|
||||
}
|
||||
}
|
||||
EXPECT_EQ(2, v1_is_n_count);
|
||||
}
|
||||
|
||||
TEST(QueryPlan, OptionalMatchEmptyDB) {
|
||||
Dbms dbms;
|
||||
auto dba = dbms.active();
|
||||
|
||||
AstTreeStorage storage;
|
||||
SymbolTable symbol_table;
|
||||
|
||||
// OPTIONAL MATCH (n)
|
||||
auto n = MakeScanAll(storage, symbol_table, "n");
|
||||
// RETURN n
|
||||
auto n_ne = NEXPR("n", IDENT("n"));
|
||||
symbol_table[*n_ne->expression_] = n.sym_;
|
||||
symbol_table[*n_ne] = symbol_table.CreateSymbol("n");
|
||||
auto optional = std::make_shared<plan::Optional>(nullptr, n.op_,
|
||||
std::vector<Symbol>{n.sym_});
|
||||
auto produce = MakeProduce(optional, n_ne);
|
||||
|
||||
auto results = CollectProduce(produce, symbol_table, *dba).GetResults();
|
||||
ASSERT_EQ(1, results.size());
|
||||
EXPECT_EQ(results[0][0].type(), TypedValue::Type::Null);
|
||||
}
|
||||
|
||||
TEST(QueryPlan, ExpandExistingNode) {
|
||||
Dbms dbms;
|
||||
auto dba = dbms.active();
|
||||
|
Loading…
Reference in New Issue
Block a user