Query::Plan - Distinct added

Reviewers: buda, mislav.bradac, teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D336
This commit is contained in:
florijan 2017-05-03 12:22:16 +02:00
parent b6ff1e1bbc
commit 355f62ce71
3 changed files with 134 additions and 15 deletions

View File

@ -1155,8 +1155,7 @@ void Aggregate::AggregateCursor::EnsureOkForAvgSum(
}
}
bool Aggregate::AggregateCursor::TypedValueListEqual::operator()(
const std::list<TypedValue> &left,
bool TypedValueListEqual::operator()(const std::list<TypedValue> &left,
const std::list<TypedValue> &right) const {
return std::equal(left.begin(), left.end(), right.begin(),
TypedValue::BoolEqual{});
@ -1581,4 +1580,35 @@ void Unwind::UnwindCursor::Reset() {
input_value_it_ = input_value_.end();
}
Distinct::Distinct(const std::shared_ptr<LogicalOperator> &input,
const std::vector<Symbol> &value_symbols)
: input_(input ? input : std::make_shared<Once>()),
value_symbols_(value_symbols) {}
ACCEPT_WITH_INPUT(Distinct)
std::unique_ptr<Cursor> Distinct::MakeCursor(GraphDbAccessor &db) {
return std::make_unique<DistinctCursor>(*this, db);
}
Distinct::DistinctCursor::DistinctCursor(Distinct &self, GraphDbAccessor &db)
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
bool Distinct::DistinctCursor::Pull(Frame &frame,
const SymbolTable &symbol_table) {
while (true) {
if (!input_cursor_->Pull(frame, symbol_table)) return false;
std::list<TypedValue> row;
for (const auto &symbol : self_.value_symbols_)
row.emplace_back(frame[symbol]);
if (seen_rows_.insert(std::move(row)).second) return true;
}
}
void Distinct::DistinctCursor::Reset() {
input_cursor_->Reset();
seen_rows_.clear();
}
} // namespace query::plan

View File

@ -76,6 +76,7 @@ class OrderBy;
class Merge;
class Optional;
class Unwind;
class Distinct;
/** @brief Base class for visitors of @c LogicalOperator class hierarchy. */
using LogicalOperatorVisitor = ::utils::Visitor<
@ -83,7 +84,7 @@ using LogicalOperatorVisitor = ::utils::Visitor<
Filter, Produce, Delete, SetProperty, SetProperties, SetLabels,
RemoveProperty, RemoveLabels, ExpandUniquenessFilter<VertexAccessor>,
ExpandUniquenessFilter<EdgeAccessor>, Accumulate, AdvanceCommand, Aggregate,
Skip, Limit, OrderBy, Merge, Optional, Unwind>;
Skip, Limit, OrderBy, Merge, Optional, Unwind, Distinct>;
/** @brief Base class for logical operators.
*
@ -886,6 +887,15 @@ class Accumulate : public LogicalOperator {
};
};
/**
* Custom equality function for a list of typed values.
* Used in unordered_maps in Aggregate and Distinct operators.
*/
struct TypedValueListEqual {
bool operator()(const std::list<TypedValue> &left,
const std::list<TypedValue> &right) const;
};
/** @brief Performs an arbitrary number of aggregations of data
* from the given input grouped by the given criteria.
*
@ -930,12 +940,6 @@ class Aggregate : public LogicalOperator {
void Reset() override;
private:
// custom equality function for the unordered map
struct TypedValueListEqual {
bool operator()(const std::list<TypedValue> &left,
const std::list<TypedValue> &right) const;
};
// Data structure for a single aggregation cache.
// does NOT include the group-by values since those
// are a key in the aggregation map.
@ -1293,5 +1297,44 @@ class Unwind : public LogicalOperator {
};
};
/**
* Ensures that only distinct rows are yielded.
* This implementation accepts a vector of Symbols
* which define a row. Only those Symbols are valid
* for use in operators following Distinct.
*
* This implementation maintains input ordering.
*/
class Distinct : public LogicalOperator {
public:
Distinct(const std::shared_ptr<LogicalOperator> &input,
const std::vector<Symbol> &value_symbols);
void Accept(LogicalOperatorVisitor &visitor) override;
std::unique_ptr<Cursor> MakeCursor(GraphDbAccessor &db) override;
private:
const std::shared_ptr<LogicalOperator> input_;
const std::vector<Symbol> value_symbols_;
class DistinctCursor : public Cursor {
public:
DistinctCursor(Distinct &self, GraphDbAccessor &db);
bool Pull(Frame &frame, const SymbolTable &symbol_table) override;
void Reset() override;
private:
const Distinct &self_;
const std::unique_ptr<Cursor> input_cursor_;
// a set of already seen rows
std::unordered_set<
std::list<TypedValue>,
// use FNV collection hashing specialized for a list of TypedValues
FnvCollection<std::list<TypedValue>, TypedValue, TypedValue::Hash>,
TypedValueListEqual>
seen_rows_;
};
};
} // namespace plan
} // namespace query

View File

@ -285,10 +285,10 @@ TEST(QueryPlan, ExpandOptional) {
// MATCH (n) OPTIONAL MATCH (n)-[r]->(m)
auto n = MakeScanAll(storage, symbol_table, "n");
auto r_m = MakeExpand(storage, symbol_table, nullptr, n.sym_,
"r", EdgeAtom::Direction::RIGHT, false, "m", false);
auto optional = std::make_shared<plan::Optional>(n.op_, r_m.op_,
std::vector<Symbol>{r_m.edge_sym_, r_m.node_sym_});
auto r_m = MakeExpand(storage, symbol_table, nullptr, n.sym_, "r",
EdgeAtom::Direction::RIGHT, false, "m", false);
auto optional = std::make_shared<plan::Optional>(
n.op_, r_m.op_, std::vector<Symbol>{r_m.edge_sym_, r_m.node_sym_});
// RETURN n, r, m
auto n_ne = NEXPR("n", IDENT("n"));
@ -642,3 +642,49 @@ TEST(QueryPlan, ExpandUniquenessFilter) {
EXPECT_EQ(0, check_expand_results(true, false));
EXPECT_EQ(1, check_expand_results(false, true));
}
TEST(QueryPlan, Distinct) {
// test queries like
// UNWIND [1, 2, 3, 3] AS x RETURN DISTINCT x
Dbms dbms;
auto dba = dbms.active();
AstTreeStorage storage;
SymbolTable symbol_table;
auto check_distinct = [&](const std::vector<TypedValue> input,
const std::vector<TypedValue> output,
bool assume_int_value) {
auto input_expr = LITERAL(TypedValue(input));
auto x = symbol_table.CreateSymbol("x");
auto unwind = std::make_shared<plan::Unwind>(nullptr, input_expr, x);
auto x_expr = IDENT("x");
symbol_table[*x_expr] = x;
auto distinct =
std::make_shared<plan::Distinct>(unwind, std::vector<Symbol>{x});
auto x_ne = NEXPR("x", x_expr);
symbol_table[*x_ne] = symbol_table.CreateSymbol("x_ne");
auto produce = MakeProduce(distinct, x_ne);
auto results = CollectProduce(produce, symbol_table, *dba).GetResults();
ASSERT_EQ(output.size(), results.size());
auto output_it = output.begin();
for (const auto &row : results) {
ASSERT_EQ(1, row.size());
ASSERT_EQ(row[0].type(), output_it->type());
if (assume_int_value)
EXPECT_EQ(output_it->Value<int64_t>(), row[0].Value<int64_t>());
output_it++;
}
};
check_distinct({1, 1, 2, 3, 3, 3}, {1, 2, 3}, true);
check_distinct({3, 2, 3, 5, 3, 5, 2, 1, 2}, {3, 2, 5, 1}, true);
check_distinct(
{3, "two", TypedValue::Null, 3, true, false, "TWO", TypedValue::Null},
{3, "two", TypedValue::Null, true, false, "TWO"}, false);
}