#include #include #include #include #include #include #include #include #include "query/frontend/ast/ast.hpp" #include "query/frontend/semantic/symbol_generator.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/plan/distributed.hpp" #include "query/plan/distributed_ops.hpp" #include "query/plan/operator.hpp" #include "query/plan/planner.hpp" #include #include "query_common.hpp" namespace query { ::std::ostream &operator<<(::std::ostream &os, const Symbol &sym) { return os << "Symbol{\"" << sym.name() << "\" [" << sym.position() << "] " << Symbol::TypeToString(sym.type()) << "}"; } } // namespace query using namespace query::plan; using query::AstStorage; using query::SingleQuery; using query::Symbol; using query::SymbolGenerator; using query::SymbolTable; using Direction = query::EdgeAtom::Direction; using Bound = ScanAllByLabelPropertyRange::Bound; namespace { class BaseOpChecker { public: virtual ~BaseOpChecker() {} virtual void CheckOp(LogicalOperator &, const SymbolTable &) = 0; }; class PlanChecker : public DistributedOperatorVisitor { public: using HierarchicalLogicalOperatorVisitor::PostVisit; using HierarchicalLogicalOperatorVisitor::PreVisit; using HierarchicalLogicalOperatorVisitor::Visit; PlanChecker(const std::list> &checkers, const SymbolTable &symbol_table) : symbol_table_(symbol_table) { for (const auto &checker : checkers) checkers_.emplace_back(checker.get()); } PlanChecker(const std::list &checkers, const SymbolTable &symbol_table) : checkers_(checkers), symbol_table_(symbol_table) {} #define PRE_VISIT(TOp) \ bool PreVisit(TOp &op) override { \ CheckOp(op); \ return true; \ } #define VISIT(TOp) \ bool Visit(TOp &op) override { \ CheckOp(op); \ return true; \ } PRE_VISIT(CreateNode); PRE_VISIT(CreateExpand); PRE_VISIT(Delete); PRE_VISIT(ScanAll); PRE_VISIT(ScanAllByLabel); PRE_VISIT(ScanAllByLabelPropertyValue); PRE_VISIT(ScanAllByLabelPropertyRange); PRE_VISIT(Expand); PRE_VISIT(ExpandVariable); PRE_VISIT(Filter); PRE_VISIT(ConstructNamedPath); PRE_VISIT(Produce); PRE_VISIT(SetProperty); PRE_VISIT(SetProperties); PRE_VISIT(SetLabels); PRE_VISIT(RemoveProperty); PRE_VISIT(RemoveLabels); PRE_VISIT(ExpandUniquenessFilter); PRE_VISIT(ExpandUniquenessFilter); PRE_VISIT(Accumulate); PRE_VISIT(Aggregate); PRE_VISIT(Skip); PRE_VISIT(Limit); PRE_VISIT(OrderBy); bool PreVisit(Merge &op) override { CheckOp(op); op.input()->Accept(*this); return false; } bool PreVisit(Optional &op) override { CheckOp(op); op.input()->Accept(*this); return false; } PRE_VISIT(Unwind); PRE_VISIT(Distinct); bool Visit(Once &) override { // Ignore checking Once, it is implicitly at the end. return true; } VISIT(CreateIndex); PRE_VISIT(PullRemote); bool PreVisit(Synchronize &op) override { CheckOp(op); op.input()->Accept(*this); return false; } bool PreVisit(Cartesian &op) override { CheckOp(op); return false; } PRE_VISIT(PullRemoteOrderBy); PRE_VISIT(DistributedExpand); PRE_VISIT(DistributedExpandBfs); PRE_VISIT(DistributedCreateNode); PRE_VISIT(DistributedCreateExpand); VISIT(AuthHandler); VISIT(CreateStream); VISIT(DropStream); VISIT(ShowStreams); VISIT(StartStopStream); VISIT(StartStopAllStreams); VISIT(TestStream); PRE_VISIT(Explain); #undef PRE_VISIT #undef VISIT std::list checkers_; private: void CheckOp(LogicalOperator &op) { ASSERT_FALSE(checkers_.empty()); checkers_.back()->CheckOp(op, symbol_table_); checkers_.pop_back(); } const SymbolTable &symbol_table_; }; template class OpChecker : public BaseOpChecker { public: void CheckOp(LogicalOperator &op, const SymbolTable &symbol_table) override { auto *expected_op = dynamic_cast(&op); ASSERT_TRUE(expected_op); ExpectOp(*expected_op, symbol_table); } virtual void ExpectOp(TOp &, const SymbolTable &) {} }; using ExpectCreateNode = OpChecker; using ExpectCreateExpand = OpChecker; using ExpectDelete = OpChecker; using ExpectScanAll = OpChecker; using ExpectScanAllByLabel = OpChecker; using ExpectExpand = OpChecker; using ExpectFilter = OpChecker; using ExpectConstructNamedPath = OpChecker; using ExpectProduce = OpChecker; using ExpectSetProperty = OpChecker; using ExpectSetProperties = OpChecker; using ExpectSetLabels = OpChecker; using ExpectRemoveProperty = OpChecker; using ExpectRemoveLabels = OpChecker; template using ExpectExpandUniquenessFilter = OpChecker>; using ExpectSkip = OpChecker; using ExpectLimit = OpChecker; using ExpectOrderBy = OpChecker; using ExpectUnwind = OpChecker; using ExpectDistinct = OpChecker; using ExpectShowStreams = OpChecker; using ExpectDistributedExpand = OpChecker; using ExpectDistributedExpandBfs = OpChecker; using ExpectDistributedCreateExpand = OpChecker; class ExpectExpandVariable : public OpChecker { public: void ExpectOp(ExpandVariable &op, const SymbolTable &) override { EXPECT_EQ(op.type_, query::EdgeAtom::Type::DEPTH_FIRST); } }; class ExpectExpandBfs : public OpChecker { public: void ExpectOp(ExpandVariable &op, const SymbolTable &) override { EXPECT_EQ(op.type_, query::EdgeAtom::Type::BREADTH_FIRST); } }; class ExpectAccumulate : public OpChecker { public: explicit ExpectAccumulate(const std::unordered_set &symbols) : symbols_(symbols) {} void ExpectOp(Accumulate &op, const SymbolTable &) override { std::unordered_set got_symbols(op.symbols_.begin(), op.symbols_.end()); EXPECT_EQ(symbols_, got_symbols); } private: const std::unordered_set symbols_; }; class ExpectAggregate : public OpChecker { public: ExpectAggregate(bool is_master, const std::vector &aggregations, const std::unordered_set &group_by) : is_master_(is_master), aggregations_(aggregations), group_by_(group_by) {} ExpectAggregate(const std::vector &aggregations, const std::unordered_set &group_by) : is_master_(false), aggregations_(aggregations), group_by_(group_by) {} void ExpectOp(Aggregate &op, const SymbolTable &symbol_table) override { auto aggr_it = aggregations_.begin(); for (const auto &aggr_elem : op.aggregations_) { ASSERT_NE(aggr_it, aggregations_.end()); auto aggr = *aggr_it++; // TODO: Proper expression equality EXPECT_EQ(typeid(aggr_elem.value).hash_code(), typeid(aggr->expression1_).hash_code()); EXPECT_EQ(typeid(aggr_elem.key).hash_code(), typeid(aggr->expression2_).hash_code()); EXPECT_EQ(aggr_elem.op, aggr->op_); if (!is_master_) { // Skip checking virtual merge aggregation symbol when the plan is // distributed. EXPECT_EQ(aggr_elem.output_sym, symbol_table.at(*aggr)); } } EXPECT_EQ(aggr_it, aggregations_.end()); // TODO: Proper group by expression equality std::unordered_set got_group_by; for (auto *expr : op.group_by_) got_group_by.insert(typeid(*expr).hash_code()); std::unordered_set expected_group_by; for (auto *expr : group_by_) expected_group_by.insert(typeid(*expr).hash_code()); EXPECT_EQ(got_group_by, expected_group_by); } private: bool is_master_ = false; std::vector aggregations_; std::unordered_set group_by_; }; auto ExpectMasterAggregate( const std::vector &aggregations, const std::unordered_set &group_by) { return ExpectAggregate(true, aggregations, group_by); } class ExpectMerge : public OpChecker { public: ExpectMerge(const std::list &on_match, const std::list &on_create) : on_match_(on_match), on_create_(on_create) {} void ExpectOp(Merge &merge, const SymbolTable &symbol_table) override { PlanChecker check_match(on_match_, symbol_table); merge.merge_match_->Accept(check_match); PlanChecker check_create(on_create_, symbol_table); merge.merge_create_->Accept(check_create); } private: const std::list &on_match_; const std::list &on_create_; }; class ExpectOptional : public OpChecker { public: explicit ExpectOptional(const std::list &optional) : optional_(optional) {} ExpectOptional(const std::vector &optional_symbols, const std::list &optional) : optional_symbols_(optional_symbols), optional_(optional) {} void ExpectOp(Optional &optional, const SymbolTable &symbol_table) override { if (!optional_symbols_.empty()) { EXPECT_THAT(optional.optional_symbols_, testing::UnorderedElementsAreArray(optional_symbols_)); } PlanChecker check_optional(optional_, symbol_table); optional.optional_->Accept(check_optional); } private: std::vector optional_symbols_; const std::list &optional_; }; class ExpectScanAllByLabelPropertyValue : public OpChecker { public: ExpectScanAllByLabelPropertyValue( storage::Label label, const std::pair &prop_pair, query::Expression *expression) : label_(label), property_(prop_pair.second), expression_(expression) {} void ExpectOp(ScanAllByLabelPropertyValue &scan_all, const SymbolTable &) override { EXPECT_EQ(scan_all.label_, label_); EXPECT_EQ(scan_all.property_, property_); // TODO: Proper expression equality EXPECT_EQ(typeid(scan_all.expression_).hash_code(), typeid(expression_).hash_code()); } private: storage::Label label_; storage::Property property_; query::Expression *expression_; }; class ExpectScanAllByLabelPropertyRange : public OpChecker { public: ExpectScanAllByLabelPropertyRange( storage::Label label, storage::Property property, std::experimental::optional lower_bound, std::experimental::optional upper_bound) : label_(label), property_(property), lower_bound_(lower_bound), upper_bound_(upper_bound) {} void ExpectOp(ScanAllByLabelPropertyRange &scan_all, const SymbolTable &) override { EXPECT_EQ(scan_all.label_, label_); EXPECT_EQ(scan_all.property_, property_); if (lower_bound_) { ASSERT_TRUE(scan_all.lower_bound_); // TODO: Proper expression equality EXPECT_EQ(typeid(scan_all.lower_bound_->value()).hash_code(), typeid(lower_bound_->value()).hash_code()); EXPECT_EQ(scan_all.lower_bound_->type(), lower_bound_->type()); } if (upper_bound_) { ASSERT_TRUE(scan_all.upper_bound_); // TODO: Proper expression equality EXPECT_EQ(typeid(scan_all.upper_bound_->value()).hash_code(), typeid(upper_bound_->value()).hash_code()); EXPECT_EQ(scan_all.upper_bound_->type(), upper_bound_->type()); } } private: storage::Label label_; storage::Property property_; std::experimental::optional lower_bound_; std::experimental::optional upper_bound_; }; class ExpectAuthHandler : public OpChecker { public: ExpectAuthHandler(query::AuthQuery::Action action, std::string user, std::string role, std::string user_or_role, query::Expression *password, std::vector privileges) : action_(action), user_(user), role_(role), user_or_role_(user_or_role), password_(password), privileges_(privileges) {} void ExpectOp(AuthHandler &auth_handler, const SymbolTable &) override { EXPECT_EQ(auth_handler.action_, action_); EXPECT_EQ(auth_handler.user_, user_); EXPECT_EQ(auth_handler.role_, role_); EXPECT_EQ(auth_handler.user_or_role_, user_or_role_); // TODO(mtomic): We need to somehow test the password expression. EXPECT_TRUE(password_); EXPECT_TRUE(auth_handler.password_); EXPECT_EQ(auth_handler.privileges_, privileges_); } private: query::AuthQuery::Action action_; std::string user_; std::string role_; std::string user_or_role_; query::Expression *password_{nullptr}; std::vector privileges_; }; class ExpectCreateIndex : public OpChecker { public: ExpectCreateIndex(storage::Label label, storage::Property property) : label_(label), property_(property) {} void ExpectOp(CreateIndex &create_index, const SymbolTable &) override { EXPECT_EQ(create_index.label_, label_); EXPECT_EQ(create_index.property_, property_); } private: storage::Label label_; storage::Property property_; }; class ExpectPullRemote : public OpChecker { public: ExpectPullRemote() {} ExpectPullRemote(const std::vector &symbols) : symbols_(symbols) {} void ExpectOp(PullRemote &op, const SymbolTable &) override { EXPECT_THAT(op.symbols_, testing::UnorderedElementsAreArray(symbols_)); } private: std::vector symbols_; }; class ExpectSynchronize : public OpChecker { public: explicit ExpectSynchronize(bool advance_command) : has_pull_(false), advance_command_(advance_command) {} ExpectSynchronize(const std::vector &symbols = {}, bool advance_command = false) : expect_pull_(symbols), has_pull_(true), advance_command_(advance_command) {} void ExpectOp(Synchronize &op, const SymbolTable &symbol_table) override { if (has_pull_) { ASSERT_TRUE(op.pull_remote_); expect_pull_.ExpectOp(*op.pull_remote_, symbol_table); } else { EXPECT_FALSE(op.pull_remote_); } EXPECT_EQ(op.advance_command_, advance_command_); } private: ExpectPullRemote expect_pull_; bool has_pull_ = true; bool advance_command_ = false; }; class ExpectCartesian : public OpChecker { public: ExpectCartesian(const std::list> &left, const std::list> &right) : left_(left), right_(right) {} void ExpectOp(Cartesian &op, const SymbolTable &symbol_table) override { ASSERT_TRUE(op.left_op_); PlanChecker left_checker(left_, symbol_table); op.left_op_->Accept(left_checker); ASSERT_TRUE(op.right_op_); PlanChecker right_checker(right_, symbol_table); op.right_op_->Accept(right_checker); } private: const std::list> &left_; const std::list> &right_; }; class ExpectDistributedCreateNode : public OpChecker { public: ExpectDistributedCreateNode(bool on_random_worker = false) : on_random_worker_(on_random_worker) {} void ExpectOp(DistributedCreateNode &op, const SymbolTable &) override { EXPECT_EQ(op.on_random_worker_, on_random_worker_); } private: bool on_random_worker_ = false; }; class ExpectPullRemoteOrderBy : public OpChecker { public: ExpectPullRemoteOrderBy(const std::vector symbols) : symbols_(symbols) {} void ExpectOp(PullRemoteOrderBy &op, const SymbolTable &) override { EXPECT_THAT(op.symbols_, testing::UnorderedElementsAreArray(symbols_)); } private: std::vector symbols_; }; class ExpectCreateStream : public OpChecker { public: ExpectCreateStream(std::string stream_name, query::Expression *stream_uri, query::Expression *stream_topic, query::Expression *transform_uri, query::Expression *batch_interval_in_ms, query::Expression *batch_size) : stream_name_(stream_name), stream_uri_(stream_uri), stream_topic_(stream_topic), transform_uri_(transform_uri), batch_interval_in_ms_(batch_interval_in_ms), batch_size_(batch_size) {} void ExpectOp(CreateStream &create_stream, const SymbolTable &) override { EXPECT_EQ(create_stream.stream_name_, stream_name_); // TODO: Proper expression equality EXPECT_EQ(typeid(create_stream.stream_uri_).hash_code(), typeid(stream_uri_).hash_code()); EXPECT_EQ(typeid(create_stream.stream_topic_).hash_code(), typeid(stream_topic_).hash_code()); EXPECT_EQ(typeid(create_stream.transform_uri_).hash_code(), typeid(transform_uri_).hash_code()); if (batch_interval_in_ms_ && create_stream.batch_interval_in_ms_) { EXPECT_EQ(typeid(create_stream.batch_interval_in_ms_).hash_code(), typeid(batch_interval_in_ms_).hash_code()); } else { EXPECT_TRUE(batch_interval_in_ms_ == nullptr && create_stream.batch_interval_in_ms_ == nullptr); } if (batch_size_ && create_stream.batch_size_) { EXPECT_EQ(typeid(create_stream.batch_size_).hash_code(), typeid(batch_size_).hash_code()); } else { EXPECT_TRUE(batch_size_ == nullptr && create_stream.batch_size_ == nullptr); } } private: std::string stream_name_; query::Expression *stream_uri_; query::Expression *stream_topic_; query::Expression *transform_uri_; query::Expression *batch_interval_in_ms_; query::Expression *batch_size_; }; class ExpectDropStream : public OpChecker { public: explicit ExpectDropStream(std::string stream_name) : stream_name_(stream_name) {} void ExpectOp(DropStream &drop_stream, const SymbolTable &) override { EXPECT_EQ(drop_stream.stream_name_, stream_name_); } private: std::string stream_name_; }; class ExpectStartStopStream : public OpChecker { public: ExpectStartStopStream(std::string stream_name, bool is_start, query::Expression *limit_batches) : stream_name_(stream_name), is_start_(is_start), limit_batches_(limit_batches) {} void ExpectOp(StartStopStream &start_stop_stream, const SymbolTable &) override { EXPECT_EQ(start_stop_stream.stream_name_, stream_name_); EXPECT_EQ(start_stop_stream.is_start_, is_start_); // TODO: Proper expression equality if (limit_batches_ && start_stop_stream.limit_batches_) { EXPECT_EQ(typeid(start_stop_stream.limit_batches_).hash_code(), typeid(limit_batches_).hash_code()); } else { EXPECT_TRUE(limit_batches_ == nullptr && start_stop_stream.limit_batches_ == nullptr); } } private: std::string stream_name_; bool is_start_; query::Expression *limit_batches_; }; class ExpectStartStopAllStreams : public OpChecker { public: explicit ExpectStartStopAllStreams(bool is_start) : is_start_(is_start) {} void ExpectOp(StartStopAllStreams &start_stop_all_streams, const SymbolTable &) override { EXPECT_EQ(start_stop_all_streams.is_start_, is_start_); } private: bool is_start_; }; class ExpectTestStream : public OpChecker { public: ExpectTestStream(std::string stream_name, query::Expression *limit_batches) : stream_name_(stream_name), limit_batches_(limit_batches) {} void ExpectOp(TestStream &test_stream, const SymbolTable &) override { EXPECT_EQ(test_stream.stream_name_, stream_name_); // TODO: Proper expression equality if (limit_batches_ && test_stream.limit_batches_) { EXPECT_EQ(typeid(test_stream.limit_batches_).hash_code(), typeid(limit_batches_).hash_code()); } else { EXPECT_TRUE(limit_batches_ == nullptr && test_stream.limit_batches_ == nullptr); } } private: std::string stream_name_; query::Expression *limit_batches_; }; auto MakeSymbolTable(query::Query &query) { SymbolTable symbol_table; SymbolGenerator symbol_generator(symbol_table); query.Accept(symbol_generator); return symbol_table; } class Planner { public: template Planner(std::vector single_query_parts, PlanningContext &context) { plan_ = MakeLogicalPlanForSingleQuery(single_query_parts, context); } auto &plan() { return *plan_; } private: std::unique_ptr plan_; }; void SavePlan(const LogicalOperator &plan, ::capnp::MessageBuilder *message) { auto builder = message->initRoot(); LogicalOperator::SaveHelper helper; Save(plan, &builder, &helper); } auto LoadPlan(const ::query::plan::capnp::LogicalOperator::Reader &reader) { std::unique_ptr plan; LogicalOperator::LoadHelper helper; Load(&plan, reader, &helper); return std::make_pair(std::move(plan), std::move(helper.ast_storage)); } class CapnpPlanner { public: template CapnpPlanner(std::vector single_query_parts, PlanningContext &context) { ::capnp::MallocMessageBuilder message; { auto original_plan = MakeLogicalPlanForSingleQuery( single_query_parts, context); SavePlan(*original_plan, &message); } { auto reader = message.getRoot(); std::tie(plan_, ast_storage_) = LoadPlan(reader); } } auto &plan() { return *plan_; } private: AstStorage ast_storage_; std::unique_ptr plan_; }; class FakeDbAccessor { public: int64_t VerticesCount(storage::Label label) const { auto found = label_index_.find(label); if (found != label_index_.end()) return found->second; return 0; } int64_t VerticesCount(storage::Label label, storage::Property property) const { for (auto &index : label_property_index_) { if (std::get<0>(index) == label && std::get<1>(index) == property) { return std::get<2>(index); } } return 0; } bool LabelPropertyIndexExists(storage::Label label, storage::Property property) const { for (auto &index : label_property_index_) { if (std::get<0>(index) == label && std::get<1>(index) == property) { return true; } } return false; } void SetIndexCount(storage::Label label, int64_t count) { label_index_[label] = count; } void SetIndexCount(storage::Label label, storage::Property property, int64_t count) { for (auto &index : label_property_index_) { if (std::get<0>(index) == label && std::get<1>(index) == property) { std::get<2>(index) = count; return; } } label_property_index_.emplace_back(label, property, count); } storage::Label Label(const std::string &name) { auto found = labels_.find(name); if (found != labels_.end()) return found->second; return labels_.emplace(name, storage::Label(labels_.size())).first->second; } storage::EdgeType EdgeType(const std::string &name) { auto found = edge_types_.find(name); if (found != edge_types_.end()) return found->second; return edge_types_.emplace(name, storage::EdgeType(edge_types_.size())) .first->second; } storage::Property Property(const std::string &name) { auto found = properties_.find(name); if (found != properties_.end()) return found->second; return properties_.emplace(name, storage::Property(properties_.size())) .first->second; } std::string PropertyName(storage::Property property) const { for (const auto &kv : properties_) { if (kv.second == property) return kv.first; } LOG(FATAL) << "Unable to find property name"; } private: std::unordered_map labels_; std::unordered_map edge_types_; std::unordered_map properties_; std::unordered_map label_index_; std::vector> label_property_index_; }; template TPlanner MakePlanner(const TDbAccessor &dba, AstStorage &storage, SymbolTable &symbol_table) { auto planning_context = MakePlanningContext(storage, symbol_table, dba); auto query_parts = CollectQueryParts(symbol_table, storage); auto single_query_parts = query_parts.query_parts.at(0).single_query_parts; return TPlanner(single_query_parts, planning_context); } template auto CheckPlan(LogicalOperator &plan, const SymbolTable &symbol_table, TChecker... checker) { std::list checkers{&checker...}; PlanChecker plan_checker(checkers, symbol_table); plan.Accept(plan_checker); EXPECT_TRUE(plan_checker.checkers_.empty()); } template auto CheckPlan(AstStorage &storage, TChecker... checker) { auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, checker...); } struct ExpectedDistributedPlan { std::list> master_checkers; std::vector>> worker_checkers; }; template DistributedPlan MakeDistributedPlan(query::AstStorage &storage) { auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); std::atomic next_plan_id{0}; return MakeDistributedPlan(planner.plan(), symbol_table, next_plan_id); } void CheckDistributedPlan(DistributedPlan &distributed_plan, ExpectedDistributedPlan &expected) { PlanChecker plan_checker(expected.master_checkers, distributed_plan.symbol_table); distributed_plan.master_plan->Accept(plan_checker); EXPECT_TRUE(plan_checker.checkers_.empty()); if (expected.worker_checkers.empty()) { EXPECT_TRUE(distributed_plan.worker_plans.empty()); } else { ASSERT_EQ(distributed_plan.worker_plans.size(), expected.worker_checkers.size()); for (size_t i = 0; i < expected.worker_checkers.size(); ++i) { PlanChecker plan_checker(expected.worker_checkers[i], distributed_plan.symbol_table); auto worker_plan = distributed_plan.worker_plans[i].second; worker_plan->Accept(plan_checker); EXPECT_TRUE(plan_checker.checkers_.empty()); } } } void CheckDistributedPlan(const LogicalOperator &plan, const SymbolTable &symbol_table, ExpectedDistributedPlan &expected_distributed_plan) { std::atomic next_plan_id{0}; auto distributed_plan = MakeDistributedPlan(plan, symbol_table, next_plan_id); EXPECT_EQ(next_plan_id - 1, distributed_plan.worker_plans.size()); CheckDistributedPlan(distributed_plan, expected_distributed_plan); } template void CheckDistributedPlan(AstStorage &storage, ExpectedDistributedPlan &expected_distributed_plan) { auto distributed_plan = MakeDistributedPlan(storage); CheckDistributedPlan(distributed_plan, expected_distributed_plan); } template std::list> MakeCheckers(T arg) { std::list> l; l.emplace_back(std::make_unique(arg)); return l; } template std::list> MakeCheckers(T arg, Rest &&... rest) { auto l = MakeCheckers(std::forward(rest)...); l.emplace_front(std::make_unique(arg)); return std::move(l); } ExpectedDistributedPlan ExpectDistributed( std::list> master_checker) { return ExpectedDistributedPlan{std::move(master_checker)}; } ExpectedDistributedPlan ExpectDistributed( std::list> master_checker, std::list> worker_checker) { ExpectedDistributedPlan expected{std::move(master_checker)}; expected.worker_checkers.emplace_back(std::move(worker_checker)); return expected; } void AddWorkerCheckers( ExpectedDistributedPlan &expected, std::list> worker_checker) { expected.worker_checkers.emplace_back(std::move(worker_checker)); } template void AddWorkerCheckers(ExpectedDistributedPlan &expected, std::list> worker_checker, Rest &&... rest) { expected.worker_checkers.emplace_back(std::move(worker_checker)); AddWorkerCheckers(expected, std::forward(rest)...); } template ExpectedDistributedPlan ExpectDistributed( std::list> master_checker, std::list> worker_checker, Rest &&... rest) { ExpectedDistributedPlan expected{std::move(master_checker)}; expected.worker_checkers.emplace_back(std::move(worker_checker)); AddWorkerCheckers(expected, std::forward(rest)...); return expected; } template class TestPlanner : public ::testing::Test {}; using PlannerTypes = ::testing::Types; TYPED_TEST_CASE(TestPlanner, PlannerTypes); TYPED_TEST(TestPlanner, MatchNodeReturn) { // Test MATCH (n) RETURN n AstStorage storage; auto *as_n = NEXPR("n", IDENT("n")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), RETURN(as_n))); auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_n)}); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, CreateNodeReturn) { // Test CREATE (n) RETURN n AS n AstStorage storage; auto ident_n = IDENT("n"); auto query = QUERY(SINGLE_QUERY(CREATE(PATTERN(NODE("n"))), RETURN(ident_n, AS("n")))); auto symbol_table = MakeSymbolTable(*query); auto acc = ExpectAccumulate({symbol_table.at(*ident_n)}); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc, ExpectProduce()); { auto expected = ExpectDistributed( MakeCheckers(ExpectDistributedCreateNode(true), ExpectSynchronize(false), ExpectProduce())); std::atomic next_plan_id{0}; auto distributed_plan = MakeDistributedPlan(planner.plan(), symbol_table, next_plan_id); CheckDistributedPlan(distributed_plan, expected); } } TYPED_TEST(TestPlanner, CreateExpand) { // Test CREATE (n) -[r :rel1]-> (m) AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("relationship"); QUERY(SINGLE_QUERY(CREATE(PATTERN( NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m"))))); CheckPlan(storage, ExpectCreateNode(), ExpectCreateExpand()); ExpectedDistributedPlan expected{ MakeCheckers(ExpectDistributedCreateNode(true), ExpectDistributedCreateExpand(), ExpectSynchronize(false)), {}}; CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, CreateMultipleNode) { // Test CREATE (n), (m) AstStorage storage; QUERY(SINGLE_QUERY(CREATE(PATTERN(NODE("n")), PATTERN(NODE("m"))))); CheckPlan(storage, ExpectCreateNode(), ExpectCreateNode()); ExpectedDistributedPlan expected{ MakeCheckers(ExpectDistributedCreateNode(true), ExpectDistributedCreateNode(true), ExpectSynchronize(false)), {}}; CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, CreateNodeExpandNode) { // Test CREATE (n) -[r :rel]-> (m), (l) AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("rel"); QUERY(SINGLE_QUERY(CREATE( PATTERN(NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m")), PATTERN(NODE("l"))))); CheckPlan(storage, ExpectCreateNode(), ExpectCreateExpand(), ExpectCreateNode()); ExpectedDistributedPlan expected{ MakeCheckers(ExpectDistributedCreateNode(true), ExpectDistributedCreateExpand(), ExpectDistributedCreateNode(true), ExpectSynchronize(false)), {}}; CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, CreateNamedPattern) { // Test CREATE p = (n) -[r :rel]-> (m) AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("rel"); QUERY(SINGLE_QUERY(CREATE(NAMED_PATTERN( "p", NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m"))))); CheckPlan(storage, ExpectCreateNode(), ExpectCreateExpand(), ExpectConstructNamedPath()); ExpectedDistributedPlan expected{ MakeCheckers(ExpectDistributedCreateNode(true), ExpectDistributedCreateExpand(), ExpectConstructNamedPath(), ExpectSynchronize(false)), {}}; CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchCreateExpand) { // Test MATCH (n) CREATE (n) -[r :rel1]-> (m) AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("relationship"); QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n"))), CREATE(PATTERN(NODE("n"), EDGE("r", Direction::OUT, {relationship}), NODE("m"))))); CheckPlan(storage, ExpectScanAll(), ExpectCreateExpand()); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectDistributedCreateExpand(), ExpectSynchronize()), MakeCheckers(ExpectScanAll(), ExpectDistributedCreateExpand())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchLabeledNodes) { // Test MATCH (n :label) RETURN n AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto *as_n = NEXPR("n", IDENT("n")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", label))), RETURN(as_n))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabel(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_n)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAllByLabel(), ExpectProduce(), pull), MakeCheckers(ExpectScanAllByLabel(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MatchPathReturn) { // Test MATCH (n) -[r :relationship]- (m) RETURN n AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("relationship"); auto *as_n = NEXPR("n", IDENT("n")); QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n"), EDGE("r", Direction::BOTH, {relationship}), NODE("m"))), RETURN(as_n))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_n)}); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MatchNamedPatternReturn) { // Test MATCH p = (n) -[r :relationship]- (m) RETURN p AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("relationship"); auto *as_p = NEXPR("p", IDENT("p")); QUERY(SINGLE_QUERY( MATCH(NAMED_PATTERN("p", NODE("n"), EDGE("r", Direction::BOTH, {relationship}), NODE("m"))), RETURN(as_p))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_p)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectConstructNamedPath(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectConstructNamedPath(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MatchNamedPatternWithPredicateReturn) { // Test MATCH p = (n) -[r :relationship]- (m) WHERE 2 = p RETURN p AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("relationship"); auto *as_p = NEXPR("p", IDENT("p")); QUERY(SINGLE_QUERY( MATCH(NAMED_PATTERN("p", NODE("n"), EDGE("r", Direction::BOTH, {relationship}), NODE("m"))), WHERE(EQ(LITERAL(2), IDENT("p"))), RETURN(as_p))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(), ExpectConstructNamedPath(), ExpectFilter(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_p)}); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectConstructNamedPath(), ExpectFilter(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectConstructNamedPath(), ExpectFilter(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, OptionalMatchNamedPatternReturn) { // Test OPTIONAL MATCH p = (n) -[r]- (m) RETURN p AstStorage storage; auto node_n = NODE("n"); auto edge = EDGE("r"); auto node_m = NODE("m"); auto pattern = NAMED_PATTERN("p", node_n, edge, node_m); auto as_p = AS("p"); QUERY(SINGLE_QUERY(OPTIONAL_MATCH(pattern), RETURN("p", as_p))); auto symbol_table = MakeSymbolTable(*storage.query()); auto get_symbol = [&symbol_table](const auto *ast_node) { return symbol_table.at(*ast_node->identifier_); }; std::vector optional_symbols{get_symbol(pattern), get_symbol(node_n), get_symbol(edge), get_symbol(node_m)}; FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); { std::list optional{new ExpectScanAll(), new ExpectExpand(), new ExpectConstructNamedPath()}; CheckPlan(planner.plan(), symbol_table, ExpectOptional(optional_symbols, optional), ExpectProduce()); } { std::list optional{ new ExpectScanAll(), new ExpectDistributedExpand(), new ExpectConstructNamedPath(), new ExpectPullRemote(optional_symbols)}; auto expected = ExpectDistributed( MakeCheckers(ExpectOptional(optional_symbols, optional), ExpectProduce()), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectConstructNamedPath())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } } TYPED_TEST(TestPlanner, MatchWhereReturn) { // Test MATCH (n) WHERE n.property < 42 RETURN n AstStorage storage; FakeDbAccessor dba; auto property = dba.Property("property"); auto *as_n = NEXPR("n", IDENT("n")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), WHERE(LESS(PROPERTY_LOOKUP("n", property), LITERAL(42))), RETURN(as_n))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectFilter(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_n)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MatchDelete) { // Test MATCH (n) DELETE n AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), DELETE(IDENT("n")))); CheckPlan(storage, ExpectScanAll(), ExpectDelete()); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectDelete(), ExpectSynchronize()), MakeCheckers(ExpectScanAll(), ExpectDelete())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchNodeSet) { // Test MATCH (n) SET n.prop = 42, n = n, n :label AstStorage storage; FakeDbAccessor dba; auto prop = dba.Property("prop"); auto label = dba.Label("label"); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), SET(PROPERTY_LOOKUP("n", prop), LITERAL(42)), SET("n", IDENT("n")), SET("n", {label}))); CheckPlan(storage, ExpectScanAll(), ExpectSetProperty(), ExpectSetProperties(), ExpectSetLabels()); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectSetProperty(), ExpectSetProperties(), ExpectSetLabels(), ExpectSynchronize()), MakeCheckers(ExpectScanAll(), ExpectSetProperty(), ExpectSetProperties(), ExpectSetLabels())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchRemove) { // Test MATCH (n) REMOVE n.prop REMOVE n :label AstStorage storage; FakeDbAccessor dba; auto prop = dba.Property("prop"); auto label = dba.Label("label"); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), REMOVE(PROPERTY_LOOKUP("n", prop)), REMOVE("n", {label}))); CheckPlan(storage, ExpectScanAll(), ExpectRemoveProperty(), ExpectRemoveLabels()); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(), ExpectRemoveLabels(), ExpectSynchronize()), MakeCheckers(ExpectScanAll(), ExpectRemoveProperty(), ExpectRemoveLabels())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchMultiPattern) { // Test MATCH (n) -[r]- (m), (j) -[e]- (i) RETURN n AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m")), PATTERN(NODE("j"), EDGE("e"), NODE("i"))), RETURN("n"))); // We expect the expansions after the first to have a uniqueness filter in a // single MATCH clause. CheckPlan( storage, ExpectScanAll(), ExpectExpand(), ExpectScanAll(), ExpectExpand(), ExpectExpandUniquenessFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchMultiPatternSameStart) { // Test MATCH (n), (n) -[e]- (m) RETURN n AstStorage storage; QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n")), PATTERN(NODE("n"), EDGE("e"), NODE("m"))), RETURN("n"))); // We expect the second pattern to generate only an Expand, since another // ScanAll would be redundant. CheckPlan(storage, ExpectScanAll(), ExpectExpand(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchMultiPatternSameExpandStart) { // Test MATCH (n) -[r]- (m), (m) -[e]- (l) RETURN n AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m")), PATTERN(NODE("m"), EDGE("e"), NODE("l"))), RETURN("n"))); // We expect the second pattern to generate only an Expand. Another // ScanAll would be redundant, as it would generate the nodes obtained from // expansion. Additionally, a uniqueness filter is expected. CheckPlan(storage, ExpectScanAll(), ExpectExpand(), ExpectExpand(), ExpectExpandUniquenessFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MultiMatch) { // Test MATCH (n) -[r]- (m) MATCH (j) -[e]- (i) -[f]- (h) RETURN n AstStorage storage; auto *node_n = NODE("n"); auto *edge_r = EDGE("r"); auto *node_m = NODE("m"); auto *node_j = NODE("j"); auto *edge_e = EDGE("e"); auto *node_i = NODE("i"); auto *edge_f = EDGE("f"); auto *node_h = NODE("h"); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n, edge_r, node_m)), MATCH(PATTERN(node_j, edge_e, node_i, edge_f, node_h)), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); // Multiple MATCH clauses form a Cartesian product, so the uniqueness should // not cross MATCH boundaries. CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(), ExpectScanAll(), ExpectExpand(), ExpectExpand(), ExpectExpandUniquenessFilter(), ExpectProduce()); auto get_symbol = [&symbol_table](const auto *atom_node) { return symbol_table.at(*atom_node->identifier_); }; ExpectPullRemote left_pull( {get_symbol(node_n), get_symbol(edge_r), get_symbol(node_m)}); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), left_pull); ExpectPullRemote right_pull({get_symbol(node_j), get_symbol(edge_e), get_symbol(node_i), get_symbol(edge_f), get_symbol(node_h)}); auto right_cart = MakeCheckers( ExpectScanAll(), ExpectDistributedExpand(), ExpectDistributedExpand(), ExpectExpandUniquenessFilter(), right_pull); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectProduce()), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand()), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectDistributedExpand(), ExpectExpandUniquenessFilter())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MultiMatchSameStart) { // Test MATCH (n) MATCH (n) -[r]- (m) RETURN n AstStorage storage; auto *as_n = NEXPR("n", IDENT("n")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m"))), RETURN(as_n))); // Similar to MatchMultiPatternSameStart, we expect only Expand from second // MATCH clause. auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_n)}); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MatchWithReturn) { // Test MATCH (old) WITH old AS new RETURN new AstStorage storage; auto *as_new = NEXPR("new", IDENT("new")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("old"))), WITH("old", AS("new")), RETURN(as_new))); // No accumulation since we only do reads. auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_new)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MatchWithWhereReturn) { // Test MATCH (old) WITH old AS new WHERE new.prop < 42 RETURN new FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto *as_new = NEXPR("new", IDENT("new")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("old"))), WITH("old", AS("new")), WHERE(LESS(PROPERTY_LOOKUP("new", prop), LITERAL(42))), RETURN(as_new))); // No accumulation since we only do reads. auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(), ExpectFilter(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_new)}); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectFilter(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectFilter(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, CreateMultiExpand) { // Test CREATE (n) -[r :r]-> (m), (n) - [p :p]-> (l) FakeDbAccessor dba; auto r = dba.EdgeType("r"); auto p = dba.EdgeType("p"); AstStorage storage; QUERY(SINGLE_QUERY( CREATE(PATTERN(NODE("n"), EDGE("r", Direction::OUT, {r}), NODE("m")), PATTERN(NODE("n"), EDGE("p", Direction::OUT, {p}), NODE("l"))))); CheckPlan(storage, ExpectCreateNode(), ExpectCreateExpand(), ExpectCreateExpand()); ExpectedDistributedPlan expected{ MakeCheckers(ExpectDistributedCreateNode(true), ExpectDistributedCreateExpand(), ExpectDistributedCreateExpand(), ExpectSynchronize(false)), {}}; CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchWithSumWhereReturn) { // Test MATCH (n) WITH SUM(n.prop) + 42 AS sum WHERE sum < 42 // RETURN sum AS result FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto sum = SUM(PROPERTY_LOOKUP("n", prop)); auto literal = LITERAL(42); QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n"))), WITH(ADD(sum, literal), AS("sum")), WHERE(LESS(IDENT("sum"), LITERAL(42))), RETURN("sum", AS("result")))); auto aggr = ExpectAggregate({sum}, {literal}); CheckPlan(storage, ExpectScanAll(), aggr, ExpectProduce(), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchReturnSum) { // Test MATCH (n) RETURN SUM(n.prop1) AS sum, n.prop2 AS group FakeDbAccessor dba; auto prop1 = dba.Property("prop1"); auto prop2 = dba.Property("prop2"); AstStorage storage; auto sum = SUM(PROPERTY_LOOKUP("n", prop1)); auto n_prop2 = PROPERTY_LOOKUP("n", prop2); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), RETURN(sum, AS("sum"), n_prop2, AS("group")))); auto aggr = ExpectAggregate({sum}, {n_prop2}); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), aggr, ExpectProduce()); { std::atomic next_plan_id{0}; auto distributed_plan = MakeDistributedPlan(planner.plan(), symbol_table, next_plan_id); auto merge_sum = SUM(IDENT("worker_sum")); auto master_aggr = ExpectMasterAggregate({merge_sum}, {n_prop2}); ExpectPullRemote pull( {symbol_table.at(*sum), symbol_table.at(*n_prop2->expression_)}); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), aggr, pull, master_aggr, ExpectProduce(), ExpectProduce()), MakeCheckers(ExpectScanAll(), aggr)); CheckDistributedPlan(distributed_plan, expected); } } TYPED_TEST(TestPlanner, CreateWithSum) { // Test CREATE (n) WITH SUM(n.prop) AS sum FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto n_prop = PROPERTY_LOOKUP("n", prop); auto sum = SUM(n_prop); auto query = QUERY(SINGLE_QUERY(CREATE(PATTERN(NODE("n"))), WITH(sum, AS("sum")))); auto symbol_table = MakeSymbolTable(*query); auto acc = ExpectAccumulate({symbol_table.at(*n_prop->expression_)}); auto aggr = ExpectAggregate({sum}, {}); auto planner = MakePlanner(dba, storage, symbol_table); // We expect both the accumulation and aggregation because the part before // WITH updates the database. CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, MatchWithCreate) { // Test MATCH (n) WITH n AS a CREATE (a) -[r :r]-> (b) FakeDbAccessor dba; auto r_type = dba.EdgeType("r"); AstStorage storage; QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n"))), WITH("n", AS("a")), CREATE( PATTERN(NODE("a"), EDGE("r", Direction::OUT, {r_type}), NODE("b"))))); CheckPlan(storage, ExpectScanAll(), ExpectProduce(), ExpectCreateExpand()); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectDistributedCreateExpand(), ExpectSynchronize()), MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectDistributedCreateExpand())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchReturnSkipLimit) { // Test MATCH (n) RETURN n SKIP 2 LIMIT 1 AstStorage storage; auto *as_n = NEXPR("n", IDENT("n")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), RETURN(as_n, SKIP(LITERAL(2)), LIMIT(LITERAL(1))))); auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(), ExpectSkip(), ExpectLimit()); ExpectPullRemote pull({symbol_table.at(*as_n)}); auto expected = ExpectDistributed(MakeCheckers(ExpectScanAll(), ExpectProduce(), pull, ExpectSkip(), ExpectLimit()), MakeCheckers(ExpectScanAll(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, CreateWithSkipReturnLimit) { // Test CREATE (n) WITH n AS m SKIP 2 RETURN m LIMIT 1 AstStorage storage; auto ident_n = IDENT("n"); auto query = QUERY(SINGLE_QUERY(CREATE(PATTERN(NODE("n"))), WITH(ident_n, AS("m"), SKIP(LITERAL(2))), RETURN("m", LIMIT(LITERAL(1))))); auto symbol_table = MakeSymbolTable(*query); auto acc = ExpectAccumulate({symbol_table.at(*ident_n)}); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); // Since we have a write query, we need to have Accumulate. This is a bit // different than Neo4j 3.0, which optimizes WITH followed by RETURN as a // single RETURN clause and then moves Skip and Limit before Accumulate. This // causes different behaviour. A newer version of Neo4j does the same thing as // us here (but who knows if they change it again). CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc, ExpectProduce(), ExpectSkip(), ExpectProduce(), ExpectLimit()); ExpectedDistributedPlan expected{ MakeCheckers(ExpectDistributedCreateNode(true), ExpectSynchronize(true), ExpectProduce(), ExpectSkip(), ExpectProduce(), ExpectLimit()), {}}; CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, CreateReturnSumSkipLimit) { // Test CREATE (n) RETURN SUM(n.prop) AS s SKIP 2 LIMIT 1 FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto n_prop = PROPERTY_LOOKUP("n", prop); auto sum = SUM(n_prop); auto query = QUERY( SINGLE_QUERY(CREATE(PATTERN(NODE("n"))), RETURN(sum, AS("s"), SKIP(LITERAL(2)), LIMIT(LITERAL(1))))); auto symbol_table = MakeSymbolTable(*query); auto acc = ExpectAccumulate({symbol_table.at(*n_prop->expression_)}); auto aggr = ExpectAggregate({sum}, {}); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc, aggr, ExpectProduce(), ExpectSkip(), ExpectLimit()); } TYPED_TEST(TestPlanner, MatchReturnOrderBy) { // Test MATCH (n) RETURN n AS m ORDER BY n.prop FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto *as_m = NEXPR("m", IDENT("n")); auto *node_n = NODE("n"); auto ret = RETURN(as_m, ORDER_BY(PROPERTY_LOOKUP("n", prop))); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n)), ret)); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectProduce(), ExpectOrderBy()); ExpectPullRemoteOrderBy pull_order_by( {symbol_table.at(*as_m), symbol_table.at(*node_n->identifier_)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectOrderBy(), pull_order_by), MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectOrderBy())); CheckDistributedPlan(planner.plan(), symbol_table, expected); // Even though last operator pulls and orders by `m` and `n`, we expect only // `m` as the output of the query execution. EXPECT_THAT(planner.plan().OutputSymbols(symbol_table), testing::UnorderedElementsAre(symbol_table.at(*as_m))); } TYPED_TEST(TestPlanner, CreateWithOrderByWhere) { // Test CREATE (n) -[r :r]-> (m) // WITH n AS new ORDER BY new.prop, r.prop WHERE m.prop < 42 FakeDbAccessor dba; auto prop = dba.Property("prop"); auto r_type = dba.EdgeType("r"); AstStorage storage; auto ident_n = IDENT("n"); auto new_prop = PROPERTY_LOOKUP("new", prop); auto r_prop = PROPERTY_LOOKUP("r", prop); auto m_prop = PROPERTY_LOOKUP("m", prop); auto query = QUERY(SINGLE_QUERY( CREATE( PATTERN(NODE("n"), EDGE("r", Direction::OUT, {r_type}), NODE("m"))), WITH(ident_n, AS("new"), ORDER_BY(new_prop, r_prop)), WHERE(LESS(m_prop, LITERAL(42))))); auto symbol_table = MakeSymbolTable(*query); // Since this is a write query, we expect to accumulate to old used symbols. auto acc = ExpectAccumulate({ symbol_table.at(*ident_n), // `n` in WITH symbol_table.at(*r_prop->expression_), // `r` in ORDER BY symbol_table.at(*m_prop->expression_), // `m` in WHERE }); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), ExpectCreateExpand(), acc, ExpectProduce(), ExpectOrderBy(), ExpectFilter()); auto expected = ExpectDistributed( MakeCheckers(ExpectDistributedCreateNode(true), ExpectDistributedCreateExpand(), ExpectSynchronize(true), ExpectProduce(), ExpectOrderBy(), ExpectFilter())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, ReturnAddSumCountOrderBy) { // Test RETURN SUM(1) + COUNT(2) AS result ORDER BY result AstStorage storage; auto sum = SUM(LITERAL(1)); auto count = COUNT(LITERAL(2)); QUERY(SINGLE_QUERY( RETURN(ADD(sum, count), AS("result"), ORDER_BY(IDENT("result"))))); auto aggr = ExpectAggregate({sum, count}, {}); CheckPlan(storage, aggr, ExpectProduce(), ExpectOrderBy()); auto expected = ExpectDistributed(MakeCheckers(aggr, ExpectProduce(), ExpectOrderBy())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, MatchMerge) { // Test MATCH (n) MERGE (n) -[r :r]- (m) // ON MATCH SET n.prop = 42 ON CREATE SET m = n // RETURN n AS n FakeDbAccessor dba; auto r_type = dba.EdgeType("r"); auto prop = dba.Property("prop"); AstStorage storage; auto ident_n = IDENT("n"); auto query = QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n"))), MERGE(PATTERN(NODE("n"), EDGE("r", Direction::BOTH, {r_type}), NODE("m")), ON_MATCH(SET(PROPERTY_LOOKUP("n", prop), LITERAL(42))), ON_CREATE(SET("m", IDENT("n")))), RETURN(ident_n, AS("n")))); std::list on_match{new ExpectExpand(), new ExpectSetProperty()}; std::list on_create{new ExpectCreateExpand(), new ExpectSetProperties()}; auto symbol_table = MakeSymbolTable(*query); // We expect Accumulate after Merge, because it is considered as a write. auto acc = ExpectAccumulate({symbol_table.at(*ident_n)}); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectMerge(on_match, on_create), acc, ExpectProduce()); for (auto &op : on_match) delete op; on_match.clear(); for (auto &op : on_create) delete op; on_create.clear(); } TYPED_TEST(TestPlanner, MatchOptionalMatchWhereReturn) { // Test MATCH (n) OPTIONAL MATCH (n) -[r]- (m) WHERE m.prop < 42 RETURN r FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), OPTIONAL_MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m"))), WHERE(LESS(PROPERTY_LOOKUP("m", prop), LITERAL(42))), RETURN("r"))); std::list optional{new ExpectScanAll(), new ExpectExpand(), new ExpectFilter()}; CheckPlan(storage, ExpectScanAll(), ExpectOptional(optional), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchUnwindReturn) { // Test MATCH (n) UNWIND [1,2,3] AS x RETURN n, x AstStorage storage; auto *as_n = NEXPR("n", IDENT("n")); auto *as_x = NEXPR("x", IDENT("x")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), UNWIND(LIST(LITERAL(1), LITERAL(2), LITERAL(3)), AS("x")), RETURN(as_n, as_x))); auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectUnwind(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_n), symbol_table.at(*as_x)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectUnwind(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectUnwind(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, ReturnDistinctOrderBySkipLimit) { // Test RETURN DISTINCT 1 ORDER BY 1 SKIP 1 LIMIT 1 AstStorage storage; QUERY(SINGLE_QUERY(RETURN_DISTINCT(LITERAL(1), AS("1"), ORDER_BY(LITERAL(1)), SKIP(LITERAL(1)), LIMIT(LITERAL(1))))); CheckPlan(storage, ExpectProduce(), ExpectDistinct(), ExpectOrderBy(), ExpectSkip(), ExpectLimit()); auto expected = ExpectDistributed( MakeCheckers(ExpectProduce(), ExpectDistinct(), ExpectOrderBy(), ExpectSkip(), ExpectLimit())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, CreateWithDistinctSumWhereReturn) { // Test CREATE (n) WITH DISTINCT SUM(n.prop) AS s WHERE s < 42 RETURN s FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto node_n = NODE("n"); auto sum = SUM(PROPERTY_LOOKUP("n", prop)); auto query = QUERY(SINGLE_QUERY(CREATE(PATTERN(node_n)), WITH_DISTINCT(sum, AS("s")), WHERE(LESS(IDENT("s"), LITERAL(42))), RETURN("s"))); auto symbol_table = MakeSymbolTable(*query); auto acc = ExpectAccumulate({symbol_table.at(*node_n->identifier_)}); auto aggr = ExpectAggregate({sum}, {}); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectCreateNode(), acc, aggr, ExpectProduce(), ExpectDistinct(), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchCrossReferenceVariable) { // Test MATCH (n {prop: m.prop}), (m {prop: n.prop}) RETURN n FakeDbAccessor dba; auto prop = PROPERTY_PAIR("prop"); AstStorage storage; auto node_n = NODE("n"); auto m_prop = PROPERTY_LOOKUP("m", prop.second); node_n->properties_[prop] = m_prop; auto node_m = NODE("m"); auto n_prop = PROPERTY_LOOKUP("n", prop.second); node_m->properties_[prop] = n_prop; QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n), PATTERN(node_m)), RETURN("n"))); // We expect both ScanAll to come before filters (2 are joined into one), // because they need to populate the symbol values. CheckPlan(storage, ExpectScanAll(), ExpectScanAll(), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchWhereBeforeExpand) { // Test MATCH (n) -[r]- (m) WHERE n.prop < 42 RETURN n FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto *as_n = NEXPR("n", IDENT("n")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m"))), WHERE(LESS(PROPERTY_LOOKUP("n", prop), LITERAL(42))), RETURN(as_n))); // We expect Fitler to come immediately after ScanAll, since it only uses `n`. auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectFilter(), ExpectExpand(), ExpectProduce()); ExpectPullRemote pull({symbol_table.at(*as_n)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectDistributedExpand(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectDistributedExpand(), ExpectProduce())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, MultiMatchWhere) { // Test MATCH (n) -[r]- (m) MATCH (l) WHERE n.prop < 42 RETURN n FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m"))), MATCH(PATTERN(NODE("l"))), WHERE(LESS(PROPERTY_LOOKUP("n", prop), LITERAL(42))), RETURN("n"))); // Even though WHERE is in the second MATCH clause, we expect Filter to come // before second ScanAll, since it only uses the value from first ScanAll. CheckPlan(storage, ExpectScanAll(), ExpectFilter(), ExpectExpand(), ExpectScanAll(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchOptionalMatchWhere) { // Test MATCH (n) -[r]- (m) OPTIONAL MATCH (l) WHERE n.prop < 42 RETURN n FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m"))), OPTIONAL_MATCH(PATTERN(NODE("l"))), WHERE(LESS(PROPERTY_LOOKUP("n", prop), LITERAL(42))), RETURN("n"))); // Even though WHERE is in the second MATCH clause, and it uses the value from // first ScanAll, it must remain part of the Optional. It should come before // optional ScanAll. std::list optional{new ExpectFilter(), new ExpectScanAll()}; CheckPlan(storage, ExpectScanAll(), ExpectExpand(), ExpectOptional(optional), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchReturnAsterisk) { // Test MATCH (n) -[e]- (m) RETURN *, m.prop FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto ret = RETURN(PROPERTY_LOOKUP("m", prop), AS("m.prop")); ret->body_.all_identifiers = true; auto query = QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("e"), NODE("m"))), ret)); auto symbol_table = MakeSymbolTable(*query); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(), ExpectProduce()); std::vector output_names; for (const auto &output_symbol : planner.plan().OutputSymbols(symbol_table)) { output_names.emplace_back(output_symbol.name()); } std::vector expected_names{"e", "m", "n", "m.prop"}; EXPECT_EQ(output_names, expected_names); } TYPED_TEST(TestPlanner, MatchReturnAsteriskSum) { // Test MATCH (n) RETURN *, SUM(n.prop) AS s FakeDbAccessor dba; auto prop = dba.Property("prop"); AstStorage storage; auto sum = SUM(PROPERTY_LOOKUP("n", prop)); auto ret = RETURN(sum, AS("s")); ret->body_.all_identifiers = true; auto query = QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), ret)); auto symbol_table = MakeSymbolTable(*query); auto planner = MakePlanner(dba, storage, symbol_table); auto *produce = dynamic_cast(&planner.plan()); ASSERT_TRUE(produce); const auto &named_expressions = produce->named_expressions_; ASSERT_EQ(named_expressions.size(), 2); auto *expanded_ident = dynamic_cast(named_expressions[0]->expression_); ASSERT_TRUE(expanded_ident); auto aggr = ExpectAggregate({sum}, {expanded_ident}); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), aggr, ExpectProduce()); std::vector output_names; for (const auto &output_symbol : planner.plan().OutputSymbols(symbol_table)) { output_names.emplace_back(output_symbol.name()); } std::vector expected_names{"n", "s"}; EXPECT_EQ(output_names, expected_names); } TYPED_TEST(TestPlanner, UnwindMergeNodeProperty) { // Test UNWIND [1] AS i MERGE (n {prop: i}) AstStorage storage; FakeDbAccessor dba; auto node_n = NODE("n"); node_n->properties_[PROPERTY_PAIR("prop")] = IDENT("i"); QUERY( SINGLE_QUERY(UNWIND(LIST(LITERAL(1)), AS("i")), MERGE(PATTERN(node_n)))); std::list on_match{new ExpectScanAll(), new ExpectFilter()}; std::list on_create{new ExpectCreateNode()}; CheckPlan(storage, ExpectUnwind(), ExpectMerge(on_match, on_create)); for (auto &op : on_match) delete op; for (auto &op : on_create) delete op; } TYPED_TEST(TestPlanner, MultipleOptionalMatchReturn) { // Test OPTIONAL MATCH (n) OPTIONAL MATCH (m) RETURN n AstStorage storage; QUERY(SINGLE_QUERY(OPTIONAL_MATCH(PATTERN(NODE("n"))), OPTIONAL_MATCH(PATTERN(NODE("m"))), RETURN("n"))); std::list optional{new ExpectScanAll()}; CheckPlan(storage, ExpectOptional(optional), ExpectOptional(optional), ExpectProduce()); } TYPED_TEST(TestPlanner, FunctionAggregationReturn) { // Test RETURN sqrt(SUM(2)) AS result, 42 AS group_by AstStorage storage; auto sum = SUM(LITERAL(2)); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY( RETURN(FN("sqrt", sum), AS("result"), group_by_literal, AS("group_by")))); auto aggr = ExpectAggregate({sum}, {group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); auto expected = ExpectDistributed(MakeCheckers(aggr, ExpectProduce())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, FunctionWithoutArguments) { // Test RETURN pi() AS pi AstStorage storage; QUERY(SINGLE_QUERY(RETURN(FN("pi"), AS("pi")))); CheckPlan(storage, ExpectProduce()); auto expected = ExpectDistributed(MakeCheckers(ExpectProduce())); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, ListLiteralAggregationReturn) { // Test RETURN [SUM(2)] AS result, 42 AS group_by AstStorage storage; auto sum = SUM(LITERAL(2)); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY( RETURN(LIST(sum), AS("result"), group_by_literal, AS("group_by")))); auto aggr = ExpectAggregate({sum}, {group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, MapLiteralAggregationReturn) { // Test RETURN {sum: SUM(2)} AS result, 42 AS group_by AstStorage storage; FakeDbAccessor dba; auto sum = SUM(LITERAL(2)); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY(RETURN(MAP({PROPERTY_PAIR("sum"), sum}), AS("result"), group_by_literal, AS("group_by")))); auto aggr = ExpectAggregate({sum}, {group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, EmptyListIndexAggregation) { // Test RETURN [][SUM(2)] AS result, 42 AS group_by AstStorage storage; auto sum = SUM(LITERAL(2)); auto empty_list = LIST(); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY( RETURN(storage.Create(empty_list, sum), AS("result"), group_by_literal, AS("group_by")))); // We expect to group by '42' and the empty list, because it is a // sub-expression of a binary operator which contains an aggregation. This is // similar to grouping by '1' in `RETURN 1 + SUM(2)`. auto aggr = ExpectAggregate({sum}, {empty_list, group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, ListSliceAggregationReturn) { // Test RETURN [1, 2][0..SUM(2)] AS result, 42 AS group_by AstStorage storage; auto sum = SUM(LITERAL(2)); auto list = LIST(LITERAL(1), LITERAL(2)); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY(RETURN(SLICE(list, LITERAL(0), sum), AS("result"), group_by_literal, AS("group_by")))); // Similarly to EmptyListIndexAggregation test, we expect grouping by list and // '42', because slicing is an operator. auto aggr = ExpectAggregate({sum}, {list, group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, ListWithAggregationAndGroupBy) { // Test RETURN [sum(2), 42] AstStorage storage; auto sum = SUM(LITERAL(2)); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY(RETURN(LIST(sum, group_by_literal), AS("result")))); auto aggr = ExpectAggregate({sum}, {group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, AggregatonWithListWithAggregationAndGroupBy) { // Test RETURN sum(2), [sum(3), 42] AstStorage storage; auto sum2 = SUM(LITERAL(2)); auto sum3 = SUM(LITERAL(3)); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY( RETURN(sum2, AS("sum2"), LIST(sum3, group_by_literal), AS("list")))); auto aggr = ExpectAggregate({sum2, sum3}, {group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, MapWithAggregationAndGroupBy) { // Test RETURN {lit: 42, sum: sum(2)} AstStorage storage; FakeDbAccessor dba; auto sum = SUM(LITERAL(2)); auto group_by_literal = LITERAL(42); QUERY(SINGLE_QUERY(RETURN(MAP({PROPERTY_PAIR("sum"), sum}, {PROPERTY_PAIR("lit"), group_by_literal}), AS("result")))); auto aggr = ExpectAggregate({sum}, {group_by_literal}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, CreateIndex) { // Test CREATE INDEX ON :Label(property) FakeDbAccessor dba; auto label = dba.Label("label"); auto property = dba.Property("property"); AstStorage storage; QUERY(SINGLE_QUERY(CREATE_INDEX_ON(label, property))); CheckPlan(storage, ExpectCreateIndex(label, property)); auto expected = ExpectDistributed(MakeCheckers(ExpectCreateIndex(label, property))); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, AtomIndexedLabelProperty) { // Test MATCH (n :label {property: 42, not_indexed: 0}) RETURN n AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto property = PROPERTY_PAIR("property"); auto not_indexed = PROPERTY_PAIR("not_indexed"); dba.SetIndexCount(label, 1); dba.SetIndexCount(label, property.second, 1); auto node = NODE("n", label); auto lit_42 = LITERAL(42); node->properties_[property] = lit_42; node->properties_[not_indexed] = LITERAL(0); QUERY(SINGLE_QUERY(MATCH(PATTERN(node)), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabelPropertyValue(label, property, lit_42), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, AtomPropertyWhereLabelIndexing) { // Test MATCH (n {property: 42}) WHERE n.not_indexed AND n:label RETURN n AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto property = PROPERTY_PAIR("property"); auto not_indexed = PROPERTY_PAIR("not_indexed"); dba.SetIndexCount(label, property.second, 0); auto node = NODE("n"); auto lit_42 = LITERAL(42); node->properties_[property] = lit_42; QUERY(SINGLE_QUERY( MATCH(PATTERN(node)), WHERE(AND(PROPERTY_LOOKUP("n", not_indexed), storage.Create( IDENT("n"), std::vector{label}))), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabelPropertyValue(label, property, lit_42), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, WhereIndexedLabelProperty) { // Test MATCH (n :label) WHERE n.property = 42 RETURN n AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto property = PROPERTY_PAIR("property"); dba.SetIndexCount(label, property.second, 0); auto lit_42 = LITERAL(42); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", label))), WHERE(EQ(PROPERTY_LOOKUP("n", property), lit_42)), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabelPropertyValue(label, property, lit_42), ExpectProduce()); } TYPED_TEST(TestPlanner, BestPropertyIndexed) { // Test MATCH (n :label) WHERE n.property = 1 AND n.better = 42 RETURN n AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto property = dba.Property("property"); // Add a vertex with :label+property combination, so that the best // :label+better remains empty and thus better choice. dba.SetIndexCount(label, property, 1); auto better = PROPERTY_PAIR("better"); dba.SetIndexCount(label, better.second, 0); auto lit_42 = LITERAL(42); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", label))), WHERE(AND(EQ(PROPERTY_LOOKUP("n", property), LITERAL(1)), EQ(PROPERTY_LOOKUP("n", better), lit_42))), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabelPropertyValue(label, better, lit_42), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MultiPropertyIndexScan) { // Test MATCH (n :label1), (m :label2) WHERE n.prop1 = 1 AND m.prop2 = 2 // RETURN n, m FakeDbAccessor dba; auto label1 = dba.Label("label1"); auto label2 = dba.Label("label2"); auto prop1 = PROPERTY_PAIR("prop1"); auto prop2 = PROPERTY_PAIR("prop2"); dba.SetIndexCount(label1, prop1.second, 0); dba.SetIndexCount(label2, prop2.second, 0); AstStorage storage; auto lit_1 = LITERAL(1); auto lit_2 = LITERAL(2); QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n", label1)), PATTERN(NODE("m", label2))), WHERE(AND(EQ(PROPERTY_LOOKUP("n", prop1), lit_1), EQ(PROPERTY_LOOKUP("m", prop2), lit_2))), RETURN("n", "m"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabelPropertyValue(label1, prop1, lit_1), ExpectScanAllByLabelPropertyValue(label2, prop2, lit_2), ExpectProduce()); } TYPED_TEST(TestPlanner, WhereIndexedLabelPropertyRange) { // Test MATCH (n :label) WHERE n.property REL_OP 42 RETURN n // REL_OP is one of: `<`, `<=`, `>`, `>=` FakeDbAccessor dba; auto label = dba.Label("label"); auto property = dba.Property("property"); dba.SetIndexCount(label, property, 0); AstStorage storage; auto lit_42 = LITERAL(42); auto n_prop = PROPERTY_LOOKUP("n", property); auto check_planned_range = [&label, &property, &dba](const auto &rel_expr, auto lower_bound, auto upper_bound) { // Shadow the first storage, so that the query is created in this one. AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", label))), WHERE(rel_expr), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabelPropertyRange(label, property, lower_bound, upper_bound), ExpectProduce()); }; { // Test relation operators which form an upper bound for range. std::vector> upper_bound_rel_op{ std::make_pair(LESS(n_prop, lit_42), Bound::Type::EXCLUSIVE), std::make_pair(LESS_EQ(n_prop, lit_42), Bound::Type::INCLUSIVE), std::make_pair(GREATER(lit_42, n_prop), Bound::Type::EXCLUSIVE), std::make_pair(GREATER_EQ(lit_42, n_prop), Bound::Type::INCLUSIVE)}; for (const auto &rel_op : upper_bound_rel_op) { check_planned_range(rel_op.first, std::experimental::nullopt, Bound(lit_42, rel_op.second)); } } { // Test relation operators which form a lower bound for range. std::vector> lower_bound_rel_op{ std::make_pair(LESS(lit_42, n_prop), Bound::Type::EXCLUSIVE), std::make_pair(LESS_EQ(lit_42, n_prop), Bound::Type::INCLUSIVE), std::make_pair(GREATER(n_prop, lit_42), Bound::Type::EXCLUSIVE), std::make_pair(GREATER_EQ(n_prop, lit_42), Bound::Type::INCLUSIVE)}; for (const auto &rel_op : lower_bound_rel_op) { check_planned_range(rel_op.first, Bound(lit_42, rel_op.second), std::experimental::nullopt); } } } TYPED_TEST(TestPlanner, UnableToUsePropertyIndex) { // Test MATCH (n: label) WHERE n.property = n.property RETURN n FakeDbAccessor dba; auto label = dba.Label("label"); auto property = dba.Property("property"); dba.SetIndexCount(label, property, 0); AstStorage storage; QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n", label))), WHERE(EQ(PROPERTY_LOOKUP("n", property), PROPERTY_LOOKUP("n", property))), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); // We can only get ScanAllByLabelIndex, because we are comparing properties // with those on the same node. CheckPlan(planner.plan(), symbol_table, ExpectScanAllByLabel(), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, SecondPropertyIndex) { // Test MATCH (n :label), (m :label) WHERE m.property = n.property RETURN n FakeDbAccessor dba; auto label = dba.Label("label"); auto property = PROPERTY_PAIR("property"); dba.SetIndexCount(label, dba.Property("property"), 0); AstStorage storage; auto n_prop = PROPERTY_LOOKUP("n", property); auto m_prop = PROPERTY_LOOKUP("m", property); QUERY( SINGLE_QUERY(MATCH(PATTERN(NODE("n", label)), PATTERN(NODE("m", label))), WHERE(EQ(m_prop, n_prop)), RETURN("n"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan( planner.plan(), symbol_table, ExpectScanAllByLabel(), // Note: We are scanning for m, therefore property should equal n_prop. ExpectScanAllByLabelPropertyValue(label, property, n_prop), ExpectProduce()); } TYPED_TEST(TestPlanner, ReturnSumGroupByAll) { // Test RETURN sum([1,2,3]), all(x in [1] where x = 1) AstStorage storage; auto sum = SUM(LIST(LITERAL(1), LITERAL(2), LITERAL(3))); auto *all = ALL("x", LIST(LITERAL(1)), WHERE(EQ(IDENT("x"), LITERAL(1)))); QUERY(SINGLE_QUERY(RETURN(sum, AS("sum"), all, AS("all")))); auto aggr = ExpectAggregate({sum}, {all}); CheckPlan(storage, aggr, ExpectProduce()); } TYPED_TEST(TestPlanner, MatchExpandVariable) { // Test MATCH (n) -[r *..3]-> (m) RETURN r AstStorage storage; auto edge = EDGE_VARIABLE("r"); edge->upper_bound_ = LITERAL(3); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), edge, NODE("m"))), RETURN("r"))); CheckPlan(storage, ExpectScanAll(), ExpectExpandVariable(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchExpandVariableNoBounds) { // Test MATCH (n) -[r *]-> (m) RETURN r AstStorage storage; auto edge = EDGE_VARIABLE("r"); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), edge, NODE("m"))), RETURN("r"))); CheckPlan(storage, ExpectScanAll(), ExpectExpandVariable(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchExpandVariableInlinedFilter) { // Test MATCH (n) -[r :type * {prop: 42}]-> (m) RETURN r FakeDbAccessor dba; auto type = dba.EdgeType("type"); auto prop = PROPERTY_PAIR("prop"); AstStorage storage; auto edge = EDGE_VARIABLE("r", Direction::BOTH, {type}); edge->properties_[prop] = LITERAL(42); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), edge, NODE("m"))), RETURN("r"))); CheckPlan( storage, ExpectScanAll(), ExpectExpandVariable(), // Filter is both inlined and post-expand ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchExpandVariableNotInlinedFilter) { // Test MATCH (n) -[r :type * {prop: m.prop}]-> (m) RETURN r FakeDbAccessor dba; auto type = dba.EdgeType("type"); auto prop = PROPERTY_PAIR("prop"); AstStorage storage; auto edge = EDGE_VARIABLE("r", Direction::BOTH, {type}); edge->properties_[prop] = EQ(PROPERTY_LOOKUP("m", prop), LITERAL(42)); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), edge, NODE("m"))), RETURN("r"))); CheckPlan(storage, ExpectScanAll(), ExpectExpandVariable(), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, UnwindMatchVariable) { // Test UNWIND [1,2,3] AS depth MATCH (n) -[r*d]-> (m) RETURN r AstStorage storage; auto edge = EDGE_VARIABLE("r", Direction::OUT); edge->lower_bound_ = IDENT("d"); edge->upper_bound_ = IDENT("d"); QUERY(SINGLE_QUERY(UNWIND(LIST(LITERAL(1), LITERAL(2), LITERAL(3)), AS("d")), MATCH(PATTERN(NODE("n"), edge, NODE("m"))), RETURN("r"))); CheckPlan(storage, ExpectUnwind(), ExpectScanAll(), ExpectExpandVariable(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchBfs) { // Test MATCH (n) -[r:type *..10 (r, n|n)]-> (m) RETURN r FakeDbAccessor dba; auto edge_type = dba.EdgeType("type"); AstStorage storage; auto *bfs = storage.Create( IDENT("r"), query::EdgeAtom::Type::BREADTH_FIRST, Direction::OUT, std::vector{edge_type}); bfs->filter_lambda_.inner_edge = IDENT("r"); bfs->filter_lambda_.inner_node = IDENT("n"); bfs->filter_lambda_.expression = IDENT("n"); bfs->upper_bound_ = LITERAL(10); auto *as_r = NEXPR("r", IDENT("r")); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), bfs, NODE("m"))), RETURN(as_r))); auto symbol_table = MakeSymbolTable(*storage.query()); { FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpandBfs(), ExpectProduce()); } { ExpectPullRemote pull({symbol_table.at(*as_r)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(), ExpectProduce(), pull), MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(), ExpectProduce())); CheckDistributedPlan(storage, expected); } } TYPED_TEST(TestPlanner, MatchDoubleScanToExpandExisting) { // Test MATCH (n) -[r]- (m :label) RETURN r FakeDbAccessor dba; auto label = dba.Label("label"); AstStorage storage; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m", label))), RETURN("r"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); // We expect 2x ScanAll and then Expand, since we are guessing that is // faster (due to low label index vertex count). CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectScanAllByLabel(), ExpectExpand(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchScanToExpand) { // Test MATCH (n) -[r]- (m :label {property: 1}) RETURN r FakeDbAccessor dba; auto label = dba.Label("label"); auto property = dba.Property("property"); // Fill vertices to the max + 1. dba.SetIndexCount(label, property, FLAGS_query_vertex_count_to_expand_existing + 1); dba.SetIndexCount(label, FLAGS_query_vertex_count_to_expand_existing + 1); AstStorage storage; auto node_m = NODE("m", label); node_m->properties_[std::make_pair("property", property)] = LITERAL(1); QUERY( SINGLE_QUERY(MATCH(PATTERN(NODE("n"), EDGE("r"), node_m)), RETURN("r"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto planner = MakePlanner(dba, storage, symbol_table); // We expect 1x ScanAll and then Expand, since we are guessing that // is faster (due to high label index vertex count). CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpand(), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, MatchWhereAndSplit) { // Test MATCH (n) -[r]- (m) WHERE n.prop AND r.prop RETURN m FakeDbAccessor dba; auto prop = PROPERTY_PAIR("prop"); AstStorage storage; QUERY(SINGLE_QUERY( MATCH(PATTERN(NODE("n"), EDGE("r"), NODE("m"))), WHERE(AND(PROPERTY_LOOKUP("n", prop), PROPERTY_LOOKUP("r", prop))), RETURN("m"))); // We expect `n.prop` filter right after scanning `n`. CheckPlan(storage, ExpectScanAll(), ExpectFilter(), ExpectExpand(), ExpectFilter(), ExpectProduce()); } TYPED_TEST(TestPlanner, ReturnAsteriskOmitsLambdaSymbols) { // Test MATCH (n) -[r* (ie, in | true)]- (m) RETURN * AstStorage storage; auto edge = EDGE_VARIABLE("r", Direction::BOTH); edge->filter_lambda_.inner_edge = IDENT("ie"); edge->filter_lambda_.inner_node = IDENT("in"); edge->filter_lambda_.expression = LITERAL(true); auto ret = storage.Create(); ret->body_.all_identifiers = true; QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), edge, NODE("m"))), ret)); auto symbol_table = MakeSymbolTable(*storage.query()); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); auto *produce = dynamic_cast(&planner.plan()); ASSERT_TRUE(produce); std::vector outputs; for (const auto &output_symbol : produce->OutputSymbols(symbol_table)) { outputs.emplace_back(output_symbol.name()); } // We expect `*` expanded to `n`, `r` and `m`. EXPECT_EQ(outputs.size(), 3); for (const auto &name : {"n", "r", "m"}) { EXPECT_TRUE(utils::Contains(outputs, name)); } } TYPED_TEST(TestPlanner, AuthQuery) { // Check if everything is properly forwarded from ast node to the operator FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(AUTH_QUERY(query::AuthQuery::Action::DROP_ROLE, "user", "role", "user_or_role", LITERAL("password"), std::vector( {query::AuthQuery::Privilege::MATCH, query::AuthQuery::Privilege::AUTH})))); CheckPlan( storage, ExpectAuthHandler(query::AuthQuery::Action::DROP_ROLE, "user", "role", "user_or_role", LITERAL("password"), {query::AuthQuery::Privilege::MATCH, query::AuthQuery::Privilege::AUTH})); auto expected = ExpectDistributed(MakeCheckers( ExpectAuthHandler(query::AuthQuery::Action::DROP_ROLE, "user", "role", "user_or_role", LITERAL("password"), {query::AuthQuery::Privilege::MATCH, query::AuthQuery::Privilege::AUTH}))); CheckDistributedPlan(storage, expected); } TYPED_TEST(TestPlanner, CreateStream) { std::string stream_name("kafka"), stream_uri("localhost:1234"), stream_topic("tropik"), transform_uri("localhost:1234/file.py"); int64_t batch_interval_in_ms = 100; int64_t batch_size = 10; { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, nullptr, nullptr))); auto expected = ExpectCreateStream( stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), nullptr, nullptr); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectCreateStream( stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), nullptr, nullptr))); CheckDistributedPlan(storage, expected_distributed); } { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, LITERAL(batch_interval_in_ms), nullptr))); auto expected = ExpectCreateStream( stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), LITERAL(batch_interval_in_ms), nullptr); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectCreateStream( stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), LITERAL(batch_interval_in_ms), nullptr))); CheckDistributedPlan(storage, expected_distributed); } { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, nullptr, LITERAL(batch_size)))); auto expected = ExpectCreateStream( stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), nullptr, LITERAL(batch_size)); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectCreateStream( stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), nullptr, LITERAL(batch_size)))); CheckDistributedPlan(storage, expected_distributed); } { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY( CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, LITERAL(batch_interval_in_ms), LITERAL(batch_size)))); auto expected = ExpectCreateStream(stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), LITERAL(batch_interval_in_ms), LITERAL(batch_size)); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectCreateStream( stream_name, LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), LITERAL(batch_interval_in_ms), LITERAL(batch_size)))); CheckDistributedPlan(storage, expected_distributed); } } TYPED_TEST(TestPlanner, DropStream) { std::string stream_name("kafka"); FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(DROP_STREAM(stream_name))); auto expected = ExpectDropStream(stream_name); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectDropStream(stream_name))); CheckDistributedPlan(storage, expected_distributed); } TYPED_TEST(TestPlanner, ShowStreams) { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(SHOW_STREAMS)); auto expected = ExpectShowStreams(); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectShowStreams())); CheckDistributedPlan(storage, expected_distributed); } TYPED_TEST(TestPlanner, StartStopStream) { std::string stream_name("kafka"); { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(START_STREAM(stream_name, nullptr))); auto expected = ExpectStartStopStream(stream_name, true, nullptr); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed( MakeCheckers(ExpectStartStopStream(stream_name, true, nullptr))); CheckDistributedPlan(storage, expected_distributed); } { FakeDbAccessor dba; AstStorage storage; auto limit_batches = LITERAL(10); QUERY(SINGLE_QUERY(START_STREAM(stream_name, limit_batches))); auto expected = ExpectStartStopStream(stream_name, true, limit_batches); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed( MakeCheckers(ExpectStartStopStream(stream_name, true, limit_batches))); CheckDistributedPlan(storage, expected_distributed); } { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(STOP_STREAM(stream_name))); auto expected = ExpectStartStopStream(stream_name, false, nullptr); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed( MakeCheckers(ExpectStartStopStream(stream_name, false, nullptr))); CheckDistributedPlan(storage, expected_distributed); } } TYPED_TEST(TestPlanner, StartStopAllStreams) { { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(START_ALL_STREAMS)); auto expected = ExpectStartStopAllStreams(true); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectStartStopAllStreams(true))); CheckDistributedPlan(storage, expected_distributed); } { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(STOP_ALL_STREAMS)); auto expected = ExpectStartStopAllStreams(false); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectStartStopAllStreams(false))); CheckDistributedPlan(storage, expected_distributed); } } TYPED_TEST(TestPlanner, TestStream) { std::string stream_name("kafka"); { FakeDbAccessor dba; AstStorage storage; QUERY(SINGLE_QUERY(TEST_STREAM(stream_name, nullptr))); auto expected = ExpectTestStream(stream_name, nullptr); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed(MakeCheckers(ExpectTestStream(stream_name, nullptr))); CheckDistributedPlan(storage, expected_distributed); } { FakeDbAccessor dba; AstStorage storage; auto limit_batches = LITERAL(10); QUERY(SINGLE_QUERY(TEST_STREAM(stream_name, limit_batches))); auto expected = ExpectTestStream(stream_name, limit_batches); CheckPlan(storage, expected); auto expected_distributed = ExpectDistributed( MakeCheckers(ExpectTestStream(stream_name, limit_batches))); CheckDistributedPlan(storage, expected_distributed); } } TYPED_TEST(TestPlanner, DistributedAvg) { // Test MATCH (n) RETURN AVG(n.prop) AS res AstStorage storage; FakeDbAccessor dba; auto prop = dba.Property("prop"); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), RETURN(AVG(PROPERTY_LOOKUP("n", prop)), AS("res")))); auto distributed_plan = MakeDistributedPlan(storage); auto &symbol_table = distributed_plan.symbol_table; auto worker_sum = SUM(PROPERTY_LOOKUP("n", prop)); auto worker_count = COUNT(PROPERTY_LOOKUP("n", prop)); { ASSERT_EQ(distributed_plan.worker_plans.size(), 1U); auto worker_plan = distributed_plan.worker_plans.back().second; auto worker_aggr_op = std::dynamic_pointer_cast(worker_plan); ASSERT_TRUE(worker_aggr_op); ASSERT_EQ(worker_aggr_op->aggregations_.size(), 2U); symbol_table[*worker_sum] = worker_aggr_op->aggregations_[0].output_sym; symbol_table[*worker_count] = worker_aggr_op->aggregations_[1].output_sym; } auto worker_aggr = ExpectAggregate({worker_sum, worker_count}, {}); auto merge_sum = SUM(IDENT("worker_sum")); auto merge_count = SUM(IDENT("worker_count")); auto master_aggr = ExpectMasterAggregate({merge_sum, merge_count}, {}); ExpectPullRemote pull( {symbol_table.at(*worker_sum), symbol_table.at(*worker_count)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), worker_aggr, pull, master_aggr, ExpectProduce(), ExpectProduce()), MakeCheckers(ExpectScanAll(), worker_aggr)); CheckDistributedPlan(distributed_plan, expected); } TYPED_TEST(TestPlanner, DistributedCollectList) { // Test MATCH (n) RETURN COLLECT(n.prop) AS res AstStorage storage; FakeDbAccessor dba; auto prop = dba.Property("prop"); auto node_n = NODE("n"); auto collect = COLLECT_LIST(PROPERTY_LOOKUP("n", prop)); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n)), RETURN(collect, AS("res")))); auto distributed_plan = MakeDistributedPlan(storage); auto &symbol_table = distributed_plan.symbol_table; auto aggr = ExpectAggregate({collect}, {}); ExpectPullRemote pull({symbol_table.at(*node_n->identifier_)}); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), pull, aggr, ExpectProduce()), MakeCheckers(ExpectScanAll())); CheckDistributedPlan(distributed_plan, expected); } TYPED_TEST(TestPlanner, DistributedMatchCreateReturn) { // Test MATCH (n) CREATE (m) RETURN m AstStorage storage; auto *ident_m = IDENT("m"); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"))), CREATE(PATTERN(NODE("m"))), RETURN(ident_m, AS("m")))); auto symbol_table = MakeSymbolTable(*storage.query()); auto acc = ExpectAccumulate({symbol_table.at(*ident_m)}); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode(), ExpectSynchronize({symbol_table.at(*ident_m)}), ExpectProduce()), MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode())); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianCreateExpand) { // Test MATCH (a), (b) CREATE (a)-[e:r]->(b) RETURN e AstStorage storage; FakeDbAccessor dba; auto relationship = dba.EdgeType("r"); auto *node_a = NODE("a"); auto *node_b = NODE("b"); QUERY(SINGLE_QUERY( MATCH(PATTERN(node_a), PATTERN(node_b)), CREATE(PATTERN(NODE("a"), EDGE("e", Direction::OUT, {relationship}), NODE("b"))), RETURN("e"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({symbol_table.at(*node_a->identifier_)})); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({symbol_table.at(*node_b->identifier_)})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectDistributedCreateExpand(), ExpectSynchronize(false), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll())); auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianExpand) { // Test MATCH (a), (b)-[e]-(c) RETURN c AstStorage storage; auto *node_a = NODE("a"); auto *node_b = NODE("b"); auto *edge_e = EDGE("e"); auto *node_c = NODE("c"); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b, edge_e, node_c)), RETURN("c"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); auto sym_b = symbol_table.at(*node_b->identifier_); auto sym_e = symbol_table.at(*edge_e->identifier_); auto sym_c = symbol_table.at(*node_c->identifier_); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectDistributedExpand(), ExpectPullRemote({sym_b, sym_e, sym_c})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll(), ExpectDistributedExpand())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianExpandToExisting) { // Test MATCH (a), (b)-[e]-(a) RETURN e AstStorage storage; auto *node_a = NODE("a"); auto *node_b = NODE("b"); QUERY(SINGLE_QUERY( MATCH(PATTERN(node_a), PATTERN(node_b, EDGE("e"), NODE("a"))), RETURN("e"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); auto sym_b = symbol_table.at(*node_b->identifier_); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectDistributedExpand(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianExpandFromExisting) { // Test MATCH (a), (b), (a)-[e]-(b) RETURN e AstStorage storage; auto *node_a = NODE("a"); auto *node_b = NODE("b"); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b), PATTERN(NODE("a"), EDGE("e"), NODE("b"))), RETURN("e"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); auto sym_b = symbol_table.at(*node_b->identifier_); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectDistributedExpand(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianFilter) { // Test MATCH (a), (b), (c) WHERE a = 42 AND b = a AND c = b RETURN c AstStorage storage; auto *node_a = NODE("a"); auto *node_b = NODE("b"); auto *node_c = NODE("c"); QUERY(SINGLE_QUERY( MATCH(PATTERN(node_a), PATTERN(node_b), PATTERN(node_c)), WHERE(AND(AND(EQ(IDENT("a"), LITERAL(42)), EQ(IDENT("b"), IDENT("a"))), EQ(IDENT("c"), IDENT("b")))), RETURN("c"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto sym_b = symbol_table.at(*node_b->identifier_); auto sym_c = symbol_table.at(*node_c->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectFilter(), ExpectPullRemote({sym_a})); auto mid_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b})); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_c})); auto mid_right_cart = MakeCheckers(ExpectCartesian(mid_cart, right_cart), ExpectFilter()); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, mid_right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll(), ExpectFilter()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByProperty) { // Test MATCH (a), (b :label) WHERE b.prop = a RETURN b AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto prop = dba.Property("prop"); // Set indexes so that lookup by property is preferred. dba.SetIndexCount(label, 1024); dba.SetIndexCount(label, prop, 0); auto *node_a = NODE("a"); auto *node_b = NODE("b", label); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b)), WHERE(EQ(PROPERTY_LOOKUP("b", prop), IDENT("a"))), RETURN("b"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto sym_b = symbol_table.at(*node_b->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); // We still expect only indexed lookup by label because property depends on // Cartesian branch. auto right_cart = MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel())); auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByLowerBound) { // Test MATCH (a), (b :label) WHERE a < b.prop RETURN b AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto prop = dba.Property("prop"); // Set indexes so that lookup by property is preferred. dba.SetIndexCount(label, 1024); dba.SetIndexCount(label, prop, 0); auto *node_a = NODE("a"); auto *node_b = NODE("b", label); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b)), WHERE(LESS(IDENT("a"), PROPERTY_LOOKUP("b", prop))), RETURN("b"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto sym_b = symbol_table.at(*node_b->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); // We still expect only indexed lookup by label because lower bound depends on // Cartesian branch. auto right_cart = MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel())); auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByUpperBound) { // Test MATCH (a), (b :label) WHERE a > b.prop RETURN b AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto prop = dba.Property("prop"); // Set indexes so that lookup by property is preferred. dba.SetIndexCount(label, 1024); dba.SetIndexCount(label, prop, 0); auto *node_a = NODE("a"); auto *node_b = NODE("b", label); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b)), WHERE(GREATER(IDENT("a"), PROPERTY_LOOKUP("b", prop))), RETURN("b"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto sym_b = symbol_table.at(*node_b->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); // We still expect only indexed lookup by label because upper bound depends on // Cartesian branch. auto right_cart = MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel())); auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TEST(TestPlanner, DistributedCartesianIndexedScanByBothBounds) { // Test MATCH (a), (b :label) WHERE a > b.prop > a RETURN b AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto prop = dba.Property("prop"); // Set indexes so that lookup by property is preferred. dba.SetIndexCount(label, 1024); dba.SetIndexCount(label, prop, 0); SymbolTable symbol_table; auto sym_a = symbol_table.CreateSymbol("a", true); auto scan_a = std::make_shared(nullptr, sym_a); auto sym_b = symbol_table.CreateSymbol("b", true); query::Expression *lower_expr = IDENT("a"); symbol_table[*lower_expr] = sym_a; auto lower_bound = utils::MakeBoundExclusive(lower_expr); query::Expression *upper_expr = IDENT("a"); symbol_table[*upper_expr] = sym_a; auto upper_bound = utils::MakeBoundExclusive(upper_expr); auto scan_b = std::make_shared( scan_a, sym_b, label, prop, lower_bound, upper_bound); auto ident_b = IDENT("b"); symbol_table[*ident_b] = sym_b; auto as_b = NEXPR("b", ident_b); auto produce = std::make_shared( scan_b, std::vector{as_b}); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); // We still expect only indexed lookup by label because both bounds depend on // Cartesian branch. auto right_cart = MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel())); CheckDistributedPlan(*produce, symbol_table, expected); } TEST(TestPlanner, DistributedCartesianIndexedScanByLowerWithBothBounds) { // Test MATCH (a), (b :label) WHERE a > b.prop > 42 RETURN b AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto prop = dba.Property("prop"); // Set indexes so that lookup by property is preferred. dba.SetIndexCount(label, 1024); dba.SetIndexCount(label, prop, 0); SymbolTable symbol_table; auto sym_a = symbol_table.CreateSymbol("a", true); auto scan_a = std::make_shared(nullptr, sym_a); auto sym_b = symbol_table.CreateSymbol("b", true); query::Expression *lower_expr = LITERAL(42); auto lower_bound = utils::MakeBoundExclusive(lower_expr); query::Expression *upper_expr = IDENT("a"); symbol_table[*upper_expr] = sym_a; auto upper_bound = utils::MakeBoundExclusive(upper_expr); auto scan_b = std::make_shared( scan_a, sym_b, label, prop, lower_bound, upper_bound); auto ident_b = IDENT("b"); symbol_table[*ident_b] = sym_b; auto as_b = NEXPR("b", ident_b); auto produce = std::make_shared( scan_b, std::vector{as_b}); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); // We still expect indexed lookup by label property range above lower bound, // because upper bound depends on Cartesian branch. auto right_cart = MakeCheckers(ExpectScanAllByLabelPropertyRange( label, prop, lower_bound, std::experimental::nullopt), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabelPropertyRange( label, prop, lower_bound, std::experimental::nullopt))); CheckDistributedPlan(*produce, symbol_table, expected); } TEST(TestPlanner, DistributedCartesianIndexedScanByUpperWithBothBounds) { // Test MATCH (a), (b :label) WHERE 42 > b.prop > a RETURN b AstStorage storage; FakeDbAccessor dba; auto label = dba.Label("label"); auto prop = dba.Property("prop"); // Set indexes so that lookup by property is preferred. dba.SetIndexCount(label, 1024); dba.SetIndexCount(label, prop, 0); SymbolTable symbol_table; auto sym_a = symbol_table.CreateSymbol("a", true); auto scan_a = std::make_shared(nullptr, sym_a); auto sym_b = symbol_table.CreateSymbol("b", true); query::Expression *lower_expr = IDENT("a"); symbol_table[*lower_expr] = sym_a; auto lower_bound = utils::MakeBoundExclusive(lower_expr); query::Expression *upper_expr = LITERAL(42); auto upper_bound = utils::MakeBoundExclusive(upper_expr); auto scan_b = std::make_shared( scan_a, sym_b, label, prop, lower_bound, upper_bound); auto ident_b = IDENT("b"); symbol_table[*ident_b] = sym_b; auto as_b = NEXPR("b", ident_b); auto produce = std::make_shared( scan_b, std::vector{as_b}); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); // We still expect indexed lookup by label property range below upper bound, // because lower bound depends on Cartesian branch. auto right_cart = MakeCheckers(ExpectScanAllByLabelPropertyRange( label, prop, std::experimental::nullopt, upper_bound), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabelPropertyRange( label, prop, std::experimental::nullopt, upper_bound))); CheckDistributedPlan(*produce, symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianProduce) { // Test MATCH (a) WITH a MATCH (b) WHERE b = a RETURN b; AstStorage storage; auto *with_a = WITH("a"); auto *node_b = NODE("b"); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("a"))), with_a, MATCH(PATTERN(node_b)), WHERE(EQ(IDENT("b"), IDENT("a"))), RETURN("b"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*with_a->body_.named_expressions[0]); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectPullRemote({sym_a})); auto sym_b = symbol_table.at(*node_b->identifier_); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed(MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(), ExpectProduce()), MakeCheckers(ExpectScanAll(), ExpectProduce()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianUnwind) { // Test MATCH (a), (b) UNWIND a AS x RETURN x AstStorage storage; auto *node_a = NODE("a"); auto *node_b = NODE("b"); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a), PATTERN(node_b)), UNWIND(IDENT("a"), AS("x")), RETURN("x"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a})); auto sym_b = symbol_table.at(*node_b->identifier_); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectUnwind(), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedCartesianCreateNode) { // Test MATCH (a) CREATE (b) WITH b MATCH (c) CREATE (d) AstStorage storage; auto *node_b = NODE("b"); auto *node_c = NODE("c"); QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("a"))), CREATE(PATTERN(node_b)), WITH("b"), MATCH(PATTERN(node_c)), CREATE(PATTERN(NODE("d"))))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_b = symbol_table.at(*node_b->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode(), ExpectSynchronize({sym_b}, true), ExpectProduce()); auto sym_c = symbol_table.at(*node_c->identifier_); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_c})); auto expected = ExpectDistributed( MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectDistributedCreateNode(true), ExpectSynchronize(false)), MakeCheckers(ExpectScanAll(), ExpectDistributedCreateNode()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedOptionalExpand) { // Test MATCH (n) OPTIONAL MATCH (n)-[e]-(m) RETURN e; AstStorage storage; auto *node_n = NODE("n"); auto *edge_e = EDGE("e"); auto *node_m = NODE("m"); auto *ret_e = RETURN("e"); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n)), OPTIONAL_MATCH(PATTERN(node_n, edge_e, node_m)), ret_e)); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_e = symbol_table.at(*ret_e->body_.named_expressions[0]); std::list optional{new ExpectDistributedExpand()}; auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectOptional(optional), ExpectProduce(), ExpectPullRemote({sym_e})), MakeCheckers(ExpectScanAll(), ExpectOptional(optional), ExpectProduce())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedOptionalCartesian) { // Test MATCH (a) OPTIONAL MATCH (b), (c) WHERE b > a RETURN c; AstStorage storage; auto *node_a = NODE("a"); auto *node_b = NODE("b"); auto *node_c = NODE("c"); QUERY(SINGLE_QUERY( MATCH(PATTERN(node_a)), OPTIONAL_MATCH(PATTERN(node_b), PATTERN(node_c)), WHERE(GREATER(node_b->identifier_, node_a->identifier_)), RETURN("c"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto sym_b = symbol_table.at(*node_b->identifier_); auto sym_c = symbol_table.at(*node_c->identifier_); auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b})); auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_c})); std::list optional{ new ExpectCartesian(left_cart, right_cart), new ExpectFilter()}; auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}), ExpectOptional(optional), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TYPED_TEST(TestPlanner, DistributedOptionalScanExpandExisting) { // Test MATCH (a) OPTIONAL MATCH (b)-[e]-(a) RETURN e; AstStorage storage; auto *node_a = NODE("a"); auto *node_b = NODE("b"); QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a)), OPTIONAL_MATCH(PATTERN(node_b, EDGE("e"), NODE("a"))), RETURN("e"))); auto symbol_table = MakeSymbolTable(*storage.query()); auto sym_a = symbol_table.at(*node_a->identifier_); auto sym_b = symbol_table.at(*node_b->identifier_); std::list optional{new ExpectScanAll(), new ExpectPullRemote({sym_b}), new ExpectDistributedExpand()}; auto expected = ExpectDistributed( MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}), ExpectOptional(optional), ExpectProduce()), MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll())); FakeDbAccessor dba; auto planner = MakePlanner(dba, storage, symbol_table); CheckDistributedPlan(planner.plan(), symbol_table, expected); } TEST(CapnpSerial, Union) { std::vector left_symbols{ Symbol("symbol", 1, true, Symbol::Type::Edge)}; std::vector right_symbols{ Symbol("symbol", 3, true, Symbol::Type::Any)}; auto union_symbols = right_symbols; auto union_op = std::make_unique(nullptr, nullptr, union_symbols, left_symbols, right_symbols); std::unique_ptr loaded_plan; ::capnp::MallocMessageBuilder message; SavePlan(*union_op, &message); AstStorage new_storage; std::tie(loaded_plan, new_storage) = LoadPlan(message.getRoot()); ASSERT_TRUE(loaded_plan); auto *loaded_op = dynamic_cast(loaded_plan.get()); ASSERT_TRUE(loaded_op); EXPECT_FALSE(loaded_op->left_op_); EXPECT_FALSE(loaded_op->right_op_); EXPECT_EQ(loaded_op->left_symbols_, left_symbols); EXPECT_EQ(loaded_op->right_symbols_, right_symbols); EXPECT_EQ(loaded_op->union_symbols_, union_symbols); } TEST(CapnpSerial, Cartesian) { std::vector left_symbols{ Symbol("left_symbol", 1, true, Symbol::Type::Edge)}; std::vector right_symbols{ Symbol("right_symbol", 3, true, Symbol::Type::Any)}; auto cartesian = std::make_unique(nullptr, left_symbols, nullptr, right_symbols); std::unique_ptr loaded_plan; ::capnp::MallocMessageBuilder message; SavePlan(*cartesian, &message); AstStorage new_storage; std::tie(loaded_plan, new_storage) = LoadPlan(message.getRoot()); ASSERT_TRUE(loaded_plan); auto *loaded_op = dynamic_cast(loaded_plan.get()); ASSERT_TRUE(loaded_op); EXPECT_FALSE(loaded_op->left_op_); EXPECT_FALSE(loaded_op->right_op_); EXPECT_EQ(loaded_op->left_symbols_, left_symbols); EXPECT_EQ(loaded_op->right_symbols_, right_symbols); } TEST(CapnpSerial, Synchronize) { auto synchronize = std::make_unique(nullptr, nullptr, true); std::unique_ptr loaded_plan; ::capnp::MallocMessageBuilder message; SavePlan(*synchronize, &message); AstStorage new_storage; std::tie(loaded_plan, new_storage) = LoadPlan(message.getRoot()); ASSERT_TRUE(loaded_plan); auto *loaded_op = dynamic_cast(loaded_plan.get()); ASSERT_TRUE(loaded_op); EXPECT_FALSE(loaded_op->input()); EXPECT_FALSE(loaded_op->pull_remote_); EXPECT_TRUE(loaded_op->advance_command_); } TEST(CapnpSerial, PullRemote) { std::vector symbols{Symbol("symbol", 1, true, Symbol::Type::Edge)}; auto pull_remote = std::make_unique(nullptr, 42, symbols); std::unique_ptr loaded_plan; ::capnp::MallocMessageBuilder message; SavePlan(*pull_remote, &message); AstStorage new_storage; std::tie(loaded_plan, new_storage) = LoadPlan(message.getRoot()); ASSERT_TRUE(loaded_plan); auto *loaded_op = dynamic_cast(loaded_plan.get()); ASSERT_TRUE(loaded_op); EXPECT_FALSE(loaded_op->input()); EXPECT_EQ(loaded_op->plan_id_, 42); EXPECT_EQ(loaded_op->symbols_, symbols); } TEST(CapnpSerial, PullRemoteOrderBy) { auto once = std::make_shared(); AstStorage storage; std::vector symbols{ Symbol("my_symbol", 2, true, Symbol::Type::Vertex, 3)}; std::vector> order_by{ {query::Ordering::ASC, IDENT("my_symbol")}}; auto pull_remote_order_by = std::make_unique(once, 42, order_by, symbols); std::unique_ptr loaded_plan; ::capnp::MallocMessageBuilder message; SavePlan(*pull_remote_order_by, &message); AstStorage new_storage; std::tie(loaded_plan, new_storage) = LoadPlan(message.getRoot()); ASSERT_TRUE(loaded_plan); auto *loaded_op = dynamic_cast(loaded_plan.get()); ASSERT_TRUE(loaded_op); ASSERT_TRUE(std::dynamic_pointer_cast(loaded_op->input())); EXPECT_EQ(loaded_op->plan_id_, 42); EXPECT_EQ(loaded_op->symbols_, symbols); ASSERT_EQ(loaded_op->order_by_.size(), 1); EXPECT_TRUE(dynamic_cast(loaded_op->order_by_[0])); ASSERT_EQ(loaded_op->compare_.ordering().size(), 1); EXPECT_EQ(loaded_op->compare_.ordering()[0], query::Ordering::ASC); } } // namespace