Request correct remote symbols for non-associative aggregation
Reviewers: florijan, msantl, mculinovic Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1191
This commit is contained in:
parent
b99436910f
commit
e8608f4050
@ -9,6 +9,7 @@
|
||||
#include "boost/archive/binary_oarchive.hpp"
|
||||
|
||||
#include "query/plan/operator.hpp"
|
||||
#include "query/plan/preprocess.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
namespace query::plan {
|
||||
@ -249,9 +250,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
|
||||
if (!is_associative()) {
|
||||
auto input = aggr_op.input();
|
||||
distributed_plan_.worker_plan = input;
|
||||
std::unordered_set<Symbol> pull_symbols(aggr_op.remember().begin(),
|
||||
aggr_op.remember().end());
|
||||
for (const auto &elem : aggr_op.aggregations()) {
|
||||
UsedSymbolsCollector collector(distributed_plan_.symbol_table);
|
||||
elem.value->Accept(collector);
|
||||
if (elem.key) elem.key->Accept(collector);
|
||||
pull_symbols.insert(collector.symbols_.begin(),
|
||||
collector.symbols_.end());
|
||||
}
|
||||
aggr_op.set_input(std::make_shared<PullRemote>(
|
||||
input, distributed_plan_.plan_id,
|
||||
input->OutputSymbols(distributed_plan_.symbol_table)));
|
||||
std::vector<Symbol>(pull_symbols.begin(), pull_symbols.end())));
|
||||
return true;
|
||||
}
|
||||
auto make_ident = [this](const auto &symbol) {
|
||||
|
@ -580,6 +580,9 @@ auto GetMerge(AstTreeStorage &storage, Pattern *pattern, OnMatch on_match,
|
||||
#define AVG(expr) \
|
||||
storage.Create<query::Aggregation>((expr), nullptr, \
|
||||
query::Aggregation::Op::AVG)
|
||||
#define COLLECT_LIST(expr) \
|
||||
storage.Create<query::Aggregation>((expr), nullptr, \
|
||||
query::Aggregation::Op::COLLECT_LIST)
|
||||
#define EQ(expr1, expr2) storage.Create<query::EqualOperator>((expr1), (expr2))
|
||||
#define NEQ(expr1, expr2) \
|
||||
storage.Create<query::NotEqualOperator>((expr1), (expr2))
|
||||
|
@ -1984,6 +1984,25 @@ TYPED_TEST(TestPlanner, DistributedAvg) {
|
||||
CheckDistributedPlan(distributed_plan, expected);
|
||||
}
|
||||
|
||||
TYPED_TEST(TestPlanner, DistributedCollectList) {
|
||||
// Test MATCH (n) RETURN COLLECT(n.prop) AS res
|
||||
AstTreeStorage storage;
|
||||
database::Master db;
|
||||
database::GraphDbAccessor dba(db);
|
||||
auto prop = dba.Property("prop");
|
||||
auto node_n = NODE("n");
|
||||
auto collect = COLLECT_LIST(PROPERTY_LOOKUP("n", prop));
|
||||
QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n)), RETURN(collect, AS("res"))));
|
||||
auto distributed_plan = MakeDistributedPlan<TypeParam>(storage);
|
||||
auto &symbol_table = distributed_plan.symbol_table;
|
||||
auto aggr = ExpectAggregate({collect}, {});
|
||||
ExpectPullRemote pull({symbol_table.at(*node_n->identifier_)});
|
||||
ExpectedDistributedPlan expected{
|
||||
MakeCheckers(ExpectScanAll(), pull, aggr, ExpectProduce()),
|
||||
MakeCheckers(ExpectScanAll())};
|
||||
CheckDistributedPlan(distributed_plan, expected);
|
||||
}
|
||||
|
||||
TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) {
|
||||
// Test MATCH (n) CREATE (m) RETURN m
|
||||
AstTreeStorage storage;
|
||||
|
Loading…
Reference in New Issue
Block a user