From 30506f44f59244bb53a71f15439adfcd9855b1e4 Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Mon, 23 Apr 2018 16:23:42 +0200 Subject: [PATCH] Use LCP to generate LogicalOperator boost serialization Summary: Since we are moving from boost to Capnp for serialization, it makes sense to keep all of the LogicalOperator classes in LCP format. This will make it easier to generate Capnp code. Depends on D1361 Reviewers: buda, mferencevic, msantl, dgleich, ipaljak, mculinovic, mtomic Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1362 --- .gitignore | 3 + src/CMakeLists.txt | 2 + src/lisp/lcp.lisp | 23 +- src/query/plan/operator.cpp | 4 - src/query/plan/operator.hpp | 2724 ----------------------------------- src/query/plan/operator.lcp | 2147 +++++++++++++++++++++++++++ 6 files changed, 2164 insertions(+), 2739 deletions(-) delete mode 100644 src/query/plan/operator.hpp create mode 100644 src/query/plan/operator.lcp diff --git a/.gitignore b/.gitignore index 04424bbf8..18fe12942 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,6 @@ TAGS # Cap'n Proto generated files *.capnp.c++ *.capnp.h + +# LCP generated C++ files +src/query/plan/operator.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 018b6b005..598287ec9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -94,6 +94,8 @@ function(add_lcp lcp_file) set(generated_lcp_files ${generated_lcp_files} ${h_file} PARENT_SCOPE) endfunction(add_lcp) +add_lcp(query/plan/operator.lcp) + add_custom_target(generate_lcp DEPENDS ${generated_lcp_files}) # Use this function to add each capnp file to generation. This way each file is diff --git a/src/lisp/lcp.lisp b/src/lisp/lcp.lisp index 18d19269b..8e2f3fc53 100644 --- a/src/lisp/lcp.lisp +++ b/src/lisp/lcp.lisp @@ -243,17 +243,18 @@ (defun boost-serialization (cpp-class) "Add boost serialization code to `CPP-CLASS'." (labels ((get-serialize-code (member-name serialize-fun) - (make-raw-cpp - :string - (if serialize-fun - (etypecase serialize-fun - (string serialize-fun) - (raw-cpp (raw-cpp-string serialize-fun)) - (function - (let ((res (funcall serialize-fun "ar" member-name))) - (check-type res (or raw-cpp string)) - res))) - (format nil "ar & ~A;" member-name)))) + (if serialize-fun + ;; Invoke or use serialize-fun + (ctypecase serialize-fun + ((or string raw-cpp) serialize-fun) + (function + (let ((res (funcall serialize-fun "ar" member-name))) + (check-type res (or raw-cpp string)) + res))) + ;; Else use the default serialization + #>cpp + ar & ${member-name}; + cpp<#)) (save-member (member) (get-serialize-code (cpp-member-name member :struct (cpp-class-structp cpp-class)) diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index b25894277..165d9a8f3 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -1604,10 +1604,6 @@ std::vector Produce::ModifiedSymbols(const SymbolTable &table) const { return OutputSymbols(table); } -const std::vector &Produce::named_expressions() { - return named_expressions_; -} - Produce::ProduceCursor::ProduceCursor(const Produce &self, database::GraphDbAccessor &db) : self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {} diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp deleted file mode 100644 index 12bb7cae9..000000000 --- a/src/query/plan/operator.hpp +++ /dev/null @@ -1,2724 +0,0 @@ -/** @file */ - -#pragma once - -#include -#include -#include -#include -#include -#include - -#include -#include "boost/serialization/base_object.hpp" -#include "boost/serialization/export.hpp" -#include "boost/serialization/serialization.hpp" -#include "boost/serialization/shared_ptr.hpp" -#include "boost/serialization/unique_ptr.hpp" - -#include "distributed/pull_produce_rpc_messages.hpp" -#include "query/common.hpp" -#include "query/frontend/ast/ast.hpp" -#include "query/frontend/semantic/symbol.hpp" -#include "query/typed_value.hpp" -#include "storage/types.hpp" -#include "utils/bound.hpp" -#include "utils/future.hpp" -#include "utils/hashing/fnv.hpp" -#include "utils/visitor.hpp" - -namespace database { -class GraphDbAccessor; -} - -namespace query { - -class Context; -class ExpressionEvaluator; -class Frame; -class SymbolTable; - -namespace plan { - -/** @brief Base class for iteration cursors of @c LogicalOperator classes. - * - * Each @c LogicalOperator must produce a concrete @c Cursor, which provides - * the iteration mechanism. - */ -class Cursor { - public: - /** @brief Run an iteration of a @c LogicalOperator. - * - * Since operators may be chained, the iteration may pull results from - * multiple operators. - * - * @param Frame May be read from or written to while performing the - * iteration. - * @param Context Used to get the position of symbols in frame and other - * information. - */ - virtual bool Pull(Frame &, Context &) = 0; - - /** - * Resets the Cursor to it's initial state. - */ - virtual void Reset() = 0; - - virtual ~Cursor() {} -}; - -class Once; -class CreateNode; -class CreateExpand; -class ScanAll; -class ScanAllByLabel; -class ScanAllByLabelPropertyRange; -class ScanAllByLabelPropertyValue; -class Expand; -class ExpandVariable; -class ConstructNamedPath; -class Filter; -class Produce; -class Delete; -class SetProperty; -class SetProperties; -class SetLabels; -class RemoveProperty; -class RemoveLabels; -template -class ExpandUniquenessFilter; -class Accumulate; -class Aggregate; -class Skip; -class Limit; -class OrderBy; -class Merge; -class Optional; -class Unwind; -class Distinct; -class CreateIndex; -class Union; -class PullRemote; -class Synchronize; -class Cartesian; -class PullRemoteOrderBy; - -using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor< - Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, - ScanAllByLabelPropertyRange, ScanAllByLabelPropertyValue, Expand, - ExpandVariable, ConstructNamedPath, Filter, Produce, Delete, SetProperty, - SetProperties, SetLabels, RemoveProperty, RemoveLabels, - ExpandUniquenessFilter, - ExpandUniquenessFilter, Accumulate, Aggregate, Skip, Limit, - OrderBy, Merge, Optional, Unwind, Distinct, Union, PullRemote, Synchronize, - Cartesian, PullRemoteOrderBy>; - -using LogicalOperatorLeafVisitor = ::utils::LeafVisitor; - -/** - * @brief Base class for hierarhical visitors of @c LogicalOperator class - * hierarchy. - */ -class HierarchicalLogicalOperatorVisitor - : public LogicalOperatorCompositeVisitor, - public LogicalOperatorLeafVisitor { - public: - using LogicalOperatorCompositeVisitor::PostVisit; - using LogicalOperatorCompositeVisitor::PreVisit; - using LogicalOperatorLeafVisitor::Visit; - using typename LogicalOperatorLeafVisitor::ReturnType; -}; - -/** Base class for logical operators. - * - * Each operator describes an operation, which is to be performed on the - * database. Operators are iterated over using a @c Cursor. Various operators - * can serve as inputs to others and thus a sequence of operations is formed. - */ -class LogicalOperator - : public ::utils::Visitable { - public: - virtual ~LogicalOperator() {} - - /** Constructs a @c Cursor which is used to run this operator. - * - * @param database::GraphDbAccessor Used to perform operations on the - * database. - */ - virtual std::unique_ptr MakeCursor( - database::GraphDbAccessor &) const = 0; - - /** Return @c Symbol vector where the query results will be stored. - * - * Currently, outputs symbols are generated in @c Produce and @c Union - * operators. @c Skip, @c Limit, @c OrderBy and @c Distinct propagate the - * symbols from @c Produce (if it exists as input operator). - * - * @param SymbolTable used to find symbols for expressions. - * @return std::vector used for results. - */ - virtual std::vector OutputSymbols(const SymbolTable &) const { - return std::vector(); - } - - /** - * Symbol vector whose values are modified by this operator sub-tree. - * - * This is different than @c OutputSymbols, because it returns all of the - * modified symbols, including those that may not be returned as the - * result of the query. Note that the modified symbols will not contain - * those that should not be read after the operator is processed. - * - * For example, `MATCH (n)-[e]-(m) RETURN n AS l` will generate `ScanAll (n) > - * Expand (e, m) > Produce (l)`. The modified symbols on Produce sub-tree will - * be `l`, the same as output symbols, because it isn't valid to read `n`, `e` - * nor `m` after Produce. On the other hand, modified symbols from Expand - * contain `e` and `m`, as well as `n`, while output symbols are empty. - * Modified symbols from ScanAll contain only `n`, while output symbols are - * also empty. - */ - virtual std::vector ModifiedSymbols(const SymbolTable &) const = 0; - - /** - * Returns true if the operator takes only one input operator. - * NOTE: When this method returns true, you may use `input` and `set_input` - * methods. - */ - virtual bool HasSingleInput() const = 0; - - /** - * Returns the input operator if it has any. - * NOTE: This should only be called if `HasSingleInput() == true`. - */ - virtual std::shared_ptr input() const = 0; - /** - * Set a different input on this operator. - * NOTE: This should only be called if `HasSingleInput() == true`. - */ - virtual void set_input(std::shared_ptr) = 0; - - private: - friend class boost::serialization::access; - - template - void serialize(TArchive &, const unsigned int) {} -}; - -template -std::pair, AstTreeStorage> LoadPlan( - TArchive &ar) { - std::unique_ptr root; - ar >> root; - return {std::move(root), std::move(ar.template get_helper( - AstTreeStorage::kHelperId))}; -} - -/** - * A logical operator whose Cursor returns true on the first Pull - * and false on every following Pull. - */ -class Once : public LogicalOperator { - public: - DEFVISITABLE(HierarchicalLogicalOperatorVisitor); - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override { - return {}; - } - - bool HasSingleInput() const override; - std::shared_ptr input() const override; - void set_input(std::shared_ptr) override; - - private: - class OnceCursor : public Cursor { - public: - OnceCursor() {} - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - bool did_pull_{false}; - }; - - friend class boost::serialization::access; - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - } -}; - -/** @brief Operator for creating a node. - * - * This op is used both for creating a single node (`CREATE` statement without - * a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or - * `CREATE (), () ...`). - * - * @sa CreateExpand - */ -class CreateNode : public LogicalOperator { - public: - /** - * @param input Optional. If @c nullptr, then a single node will be - * created (a single successful @c Cursor::Pull from this op's @c Cursor). - * If a valid input, then a node will be created for each - * successful pull from the given input. - * @param node_atom @c NodeAtom with information on how to create a node. - * @param on_random_worker If the node should be created locally or on random - * worker. - */ - CreateNode(const std::shared_ptr &input, NodeAtom *node_atom, - bool on_random_worker); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - auto on_random_worker() const { return on_random_worker_; } - void set_on_random_worker(bool v) { on_random_worker_ = v; } - - private: - CreateNode() {} - - std::shared_ptr input_; - NodeAtom *node_atom_ = nullptr; - bool on_random_worker_ = false; - - class CreateNodeCursor : public Cursor { - public: - CreateNodeCursor(const CreateNode &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const CreateNode &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointer(ar, node_atom_); - ar &on_random_worker_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointer(ar, node_atom_); - ar &on_random_worker_; - } -}; - -/** @brief Operator for creating edges and destination nodes. - * - * This operator extends already created nodes with an edge. If the other node - * on the edge does not exist, it will be created. For example, in `MATCH (n) - * CREATE (n) -[r:r]-> (n)` query, this operator will create just the edge `r`. - * In `MATCH (n) CREATE (n) -[r:r]-> (m)` query, the operator will create both - * the edge `r` and the node `m`. In case of `CREATE (n) -[r:r]-> (m)` the - * first node `n` is created by @c CreateNode operator, while @c CreateExpand - * will create the edge `r` and `m`. Similarly, multiple @c CreateExpand are - * chained in cases when longer paths need creating. - * - * @sa CreateNode - */ -class CreateExpand : public LogicalOperator { - public: - /** @brief Construct @c CreateExpand. - * - * @param node_atom @c NodeAtom at the end of the edge. Used to create a node, - * unless it refers to an existing one. - * @param edge_atom @c EdgeAtom with information for the edge to be created. - * @param input Optional. Previous @c LogicalOperator which will be pulled. - * For each successful @c Cursor::Pull, this operator will create an - * expansion. - * @param input_symbol @c Symbol for the node at the start of the edge. - * @param existing_node @c bool indicating whether the @c node_atom refers to - * an existing node. If @c false, the operator will also create the node. - */ - CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, - const std::shared_ptr &input, - Symbol input_symbol, bool existing_node); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - // info on what's getting expanded - NodeAtom *node_atom_; - EdgeAtom *edge_atom_; - - // the input op and the symbol under which the op's result - // can be found in the frame - std::shared_ptr input_; - Symbol input_symbol_; - - // if the given node atom refers to an existing node - // (either matched or created) - bool existing_node_; - - CreateExpand() {} - - class CreateExpandCursor : public Cursor { - public: - CreateExpandCursor(const CreateExpand &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const CreateExpand &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - - /** Gets the existing node (if existing_node_ == true), or creates a new - * node (on the given worker) and returns it. */ - VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context); - - /** - * Helper function for creating an edge and adding it - * to the frame. - * - * @param from Origin vertex of the edge. - * @param to Destination vertex of the edge. - * @param evaluator Expression evaluator for property value eval. - */ - void CreateEdge(VertexAccessor &from, VertexAccessor &to, Frame &frame, - const SymbolTable &symbol_table, - ExpressionEvaluator &evaluator); - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - SavePointer(ar, node_atom_); - SavePointer(ar, edge_atom_); - ar &input_; - ar &input_symbol_; - ar &existing_node_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - LoadPointer(ar, node_atom_); - LoadPointer(ar, edge_atom_); - ar &input_; - ar &input_symbol_; - ar &existing_node_; - } -}; - -/** - * @brief Operator which iterates over all the nodes currently in the database. - * When given an input (optional), does a cartesian product. - * - * It accepts an optional input. If provided then this op scans all the nodes - * currently in the database for each successful Pull from it's input, thereby - * producing a cartesian product of input Pulls and database elements. - * - * ScanAll can either iterate over the previous graph state (state before - * the current transacton+command) or over current state. This is controlled - * with a constructor argument. - * - * @sa ScanAllByLabel - * @sa ScanAllByLabelPropertyRange - * @sa ScanAllByLabelPropertyValue - */ -class ScanAll : public LogicalOperator { - public: - ScanAll(const std::shared_ptr &input, Symbol output_symbol, - GraphView graph_view = GraphView::OLD); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - auto output_symbol() const { return output_symbol_; } - auto graph_view() const { return graph_view_; } - - protected: - std::shared_ptr input_; - Symbol output_symbol_; - /** - * @brief Controls which graph state is used to produce vertices. - * - * If @c GraphView::OLD, @c ScanAll will produce vertices visible in the - * previous graph state, before modifications done by current transaction & - * command. With @c GraphView::NEW, all vertices will be produced the current - * transaction sees along with their modifications. - */ - GraphView graph_view_; - - ScanAll() {} - - private: - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &output_symbol_; - ar &graph_view_; - } -}; - -/** - * @brief Behaves like @c ScanAll, but this operator produces only vertices with - * given label. - * - * @sa ScanAll - * @sa ScanAllByLabelPropertyRange - * @sa ScanAllByLabelPropertyValue - */ -class ScanAllByLabel : public ScanAll { - public: - ScanAllByLabel(const std::shared_ptr &input, - Symbol output_symbol, storage::Label label, - GraphView graph_view = GraphView::OLD); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - - storage::Label label() const { return label_; } - - private: - storage::Label label_; - - ScanAllByLabel() {} - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &label_; - } -}; - -/** - * Behaves like @c ScanAll, but produces only vertices with given label and - * property value which is inside a range (inclusive or exlusive). - * - * @sa ScanAll - * @sa ScanAllByLabel - * @sa ScanAllByLabelPropertyValue - */ -class ScanAllByLabelPropertyRange : public ScanAll { - public: - /** Bound with expression which when evaluated produces the bound value. */ - using Bound = utils::Bound; - /** - * Constructs the operator for given label and property value in range - * (inclusive). - * - * Range bounds are optional, but only one bound can be left out. - * - * @param input Preceding operator which will serve as the input. - * @param output_symbol Symbol where the vertices will be stored. - * @param label Label which the vertex must have. - * @param property Property from which the value will be looked up from. - * @param lower_bound Optional lower @c Bound. - * @param upper_bound Optional upper @c Bound. - * @param graph_view GraphView used when obtaining vertices. - */ - ScanAllByLabelPropertyRange(const std::shared_ptr &input, - Symbol output_symbol, storage::Label label, - storage::Property property, - std::experimental::optional lower_bound, - std::experimental::optional upper_bound, - GraphView graph_view = GraphView::OLD); - - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - - auto label() const { return label_; } - auto property() const { return property_; } - auto lower_bound() const { return lower_bound_; } - auto upper_bound() const { return upper_bound_; } - - private: - storage::Label label_; - storage::Property property_; - std::experimental::optional lower_bound_; - std::experimental::optional upper_bound_; - - ScanAllByLabelPropertyRange() {} - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &label_; - ar &property_; - auto save_bound = [&ar](auto &maybe_bound) { - if (!maybe_bound) { - ar & false; - return; - } - ar & true; - auto &bound = *maybe_bound; - ar &bound.type(); - SavePointer(ar, bound.value()); - }; - save_bound(lower_bound_); - save_bound(upper_bound_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &label_; - ar &property_; - auto load_bound = [&ar](auto &maybe_bound) { - bool has_bound = false; - ar &has_bound; - if (!has_bound) { - maybe_bound = std::experimental::nullopt; - return; - } - utils::BoundType type; - ar &type; - Expression *value; - LoadPointer(ar, value); - maybe_bound = std::experimental::make_optional(Bound(value, type)); - }; - load_bound(lower_bound_); - load_bound(upper_bound_); - } -}; - -/** - * Behaves like @c ScanAll, but produces only vertices with given label and - * property value. - * - * @sa ScanAll - * @sa ScanAllByLabel - * @sa ScanAllByLabelPropertyRange - */ -class ScanAllByLabelPropertyValue : public ScanAll { - public: - /** - * Constructs the operator for given label and property value. - * - * @param input Preceding operator which will serve as the input. - * @param output_symbol Symbol where the vertices will be stored. - * @param label Label which the vertex must have. - * @param property Property from which the value will be looked up from. - * @param expression Expression producing the value of the vertex property. - * @param graph_view GraphView used when obtaining vertices. - */ - ScanAllByLabelPropertyValue(const std::shared_ptr &input, - Symbol output_symbol, storage::Label label, - storage::Property property, - Expression *expression, - GraphView graph_view = GraphView::OLD); - - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - - auto label() const { return label_; } - auto property() const { return property_; } - auto expression() const { return expression_; } - - private: - storage::Label label_; - storage::Property property_; - Expression *expression_; - - ScanAllByLabelPropertyValue() {} - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &label_; - ar &property_; - SavePointer(ar, expression_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &label_; - ar &property_; - LoadPointer(ar, expression_); - } -}; - -/** - * Common functionality and data members of single-edge - * and variable-length expansion - */ -class ExpandCommon { - protected: - // types that we'll use for members in both subclasses - using InEdgeT = decltype(std::declval().in()); - using InEdgeIteratorT = decltype(std::declval().in().begin()); - using OutEdgeT = decltype(std::declval().out()); - using OutEdgeIteratorT = - decltype(std::declval().out().begin()); - - public: - /** - * Initializes common expansion parameters. - * - * Edge/Node existence are controlled with booleans. 'true' - * denotes that this expansion references an already - * Pulled node/edge, and should only be checked for equality - * during expansion. - * - * Expansion can be done from old or new state of the vertex - * the expansion originates from. This is controlled with a - * constructor argument. - * - * @param node_symbol Symbol pointing to the node to be expanded. This is - * where the new node will be stored. - * @param edge_symbol Symbol for the edges to be expanded. This is where - * a TypedValue containing a list of expanded edges will be stored. - * @param direction EdgeAtom::Direction determining the direction of edge - * expansion. The direction is relative to the starting vertex for each - * expansion. - * @param edge_types storage::EdgeType specifying which edges we - * want to expand. If empty, all edges are valid. If not empty, only edges - * with one of the given types are valid. - * @param input Optional LogicalOperator that preceeds this one. - * @param input_symbol Symbol that points to a VertexAccessor in the Frame - * that expansion should emanate from. - * @param existing_node If or not the node to be expanded is already present - * in the Frame and should just be checked for equality. - */ - ExpandCommon(Symbol node_symbol, Symbol edge_symbol, - EdgeAtom::Direction direction, - const std::vector &edge_types, - const std::shared_ptr &input, - Symbol input_symbol, bool existing_node, - GraphView graph_view = GraphView::AS_IS); - - const auto &input_symbol() const { return input_symbol_; } - const auto &node_symbol() const { return node_symbol_; } - const auto &edge_symbol() const { return edge_symbol_; } - const auto &direction() const { return direction_; } - const auto &edge_types() const { return edge_types_; } - - protected: - // info on what's getting expanded - Symbol node_symbol_; - Symbol edge_symbol_; - EdgeAtom::Direction direction_; - std::vector edge_types_; - - // the input op and the symbol under which the op's result - // can be found in the frame - std::shared_ptr input_; - Symbol input_symbol_; - - // If the given node atom refer to a symbol that has already been expanded and - // should be just validated in the frame. - bool existing_node_; - - // from which state the input node should get expanded - GraphView graph_view_; - - /** - * For a newly expanded node handles existence checking and - * frame placement. - * - * @return If or not the given new_node is a valid expansion. It is not - * valid only when matching and existing node and new_node does not match - * the old. - */ - bool HandleExistingNode(const VertexAccessor &new_node, Frame &frame) const; - - ExpandCommon() {} - - private: - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &node_symbol_; - ar &edge_symbol_; - ar &direction_; - ar &edge_types_; - ar &input_; - ar &input_symbol_; - ar &existing_node_; - ar &graph_view_; - } -}; - -/** - * @brief Expansion operator. For a node existing in the frame it - * expands one edge and one node and places them on the frame. - * - * This class does not handle node/edge filtering based on - * properties, labels and edge types. However, it does handle - * filtering on existing node / edge. - * - * Filtering on existing means that for a pattern that references - * an already declared node or edge (for example in - * MATCH (a) MATCH (a)--(b)), - * only expansions that match defined equalities are successfully - * pulled. - */ -class Expand : public LogicalOperator, public ExpandCommon { - public: - /** - * Creates an expansion. All parameters are forwarded to @c ExpandCommon and - * are documented there. - */ - using ExpandCommon::ExpandCommon; - - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - class ExpandCursor : public Cursor { - public: - ExpandCursor(const Expand &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - struct FutureExpand { - utils::Future> edge_to; - std::vector frame_elems; - }; - - const Expand &self_; - const std::unique_ptr input_cursor_; - database::GraphDbAccessor &db_; - - // The iterable over edges and the current edge iterator are referenced via - // optional because they can not be initialized in the constructor of - // this class. They are initialized once for each pull from the input. - std::experimental::optional in_edges_; - std::experimental::optional in_edges_it_; - std::experimental::optional out_edges_; - std::experimental::optional out_edges_it_; - // Stores the last frame before we yield the frame for future edge. It needs - // to be restored afterward. - std::vector last_frame_; - // Edges which are being asynchronously fetched from a remote worker. - // NOTE: This should be destructed first to ensure that no invalid - // references or pointers exists to other objects of this class. - std::vector future_expands_; - - bool InitEdges(Frame &, Context &); - }; - - private: - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &boost::serialization::base_object(*this); - } -}; - -/** - * Variable-length expansion operator. For a node existing in - * the frame it expands a variable number of edges and places them - * (in a list-type TypedValue), as well as the final destination node, - * on the frame. - * - * This class does not handle node/edge filtering based on - * properties, labels and edge types. However, it does handle - * filtering on existing node / edge. Additionally it handles's - * edge-uniquess (cyphermorphism) because it's not feasable to do - * later. - * - * Filtering on existing means that for a pattern that references - * an already declared node or edge (for example in - * MATCH (a) MATCH (a)--(b)), - * only expansions that match defined equalities are succesfully - * pulled. - */ -class ExpandVariable : public LogicalOperator, public ExpandCommon { - // the Cursors are not declared in the header because - // it's edges_ and edges_it_ are decltyped using a helper function - // that should be inaccessible (private class function won't compile) - friend class ExpandVariableCursor; - friend class ExpandBreadthFirstCursor; - friend class ExpandWeightedShortestPathCursor; - - public: - struct Lambda { - // Symbols for a single node and edge that are currently getting expanded. - Symbol inner_edge_symbol; - Symbol inner_node_symbol; - // Expression used in lambda during expansion. - Expression *expression; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &inner_edge_symbol; - ar &inner_node_symbol; - SavePointer(ar, expression); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &inner_edge_symbol; - ar &inner_node_symbol; - LoadPointer(ar, expression); - } - }; - - /** - * Creates a variable-length expansion. Most params are forwarded - * to the @c ExpandCommon constructor, and are documented there. - * - * Expansion length bounds are both inclusive (as in Neo's Cypher - * implementation). - * - * @param type - Either Type::DEPTH_FIRST (default variable-length expansion), - * or Type::BREADTH_FIRST. - * @param is_reverse Set to `true` if the edges written on frame should expand - * from `node_symbol` to `input_symbol`. Opposed to the usual expanding - * from `input_symbol` to `node_symbol`. - * @param lower_bound An optional indicator of the minimum number of edges - * that get expanded (inclusive). - * @param upper_bound An optional indicator of the maximum number of edges - * that get expanded (inclusive). - * @param inner_edge_symbol Like `inner_node_symbol` - * @param inner_node_symbol For each expansion the node expanded into is - * assigned to this symbol so it can be evaulated by the 'where' - * expression. - * @param filter_ The filter that must be satisfied for an expansion to - * succeed. Can use inner(node|edge) symbols. If nullptr, it is ignored. - */ - ExpandVariable(Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Type type, - EdgeAtom::Direction direction, - const std::vector &edge_types, - bool is_reverse, Expression *lower_bound, - Expression *upper_bound, - const std::shared_ptr &input, - Symbol input_symbol, bool existing_node, Lambda filter_lambda, - std::experimental::optional weight_lambda, - std::experimental::optional total_weight, - GraphView graph_view = GraphView::AS_IS); - - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - auto type() const { return type_; } - - private: - EdgeAtom::Type type_; - // True if the path should be written as expanding from node_symbol to - // input_symbol. - bool is_reverse_; - // lower and upper bounds of the variable length expansion - // both are optional, defaults are (1, inf) - Expression *lower_bound_; - Expression *upper_bound_; - - Lambda filter_lambda_; - std::experimental::optional weight_lambda_; - std::experimental::optional total_weight_; - - ExpandVariable() {} - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &boost::serialization::base_object(*this); - ar &type_; - ar &is_reverse_; - SavePointer(ar, lower_bound_); - SavePointer(ar, upper_bound_); - ar &filter_lambda_; - ar &weight_lambda_; - ar &total_weight_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &boost::serialization::base_object(*this); - ar &type_; - ar &is_reverse_; - LoadPointer(ar, lower_bound_); - LoadPointer(ar, upper_bound_); - ar &filter_lambda_; - ar &weight_lambda_; - ar &total_weight_; - } -}; - -/** - * Constructs a named path from it's elements and places it on the frame. - */ -class ConstructNamedPath : public LogicalOperator { - public: - ConstructNamedPath(const std::shared_ptr &input, - Symbol path_symbol, - const std::vector &path_elements) - : input_(input), - path_symbol_(path_symbol), - path_elements_(path_elements) {} - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - const auto &path_symbol() const { return path_symbol_; } - const auto &path_elements() const { return path_elements_; } - - private: - std::shared_ptr input_; - Symbol path_symbol_; - std::vector path_elements_; - - ConstructNamedPath() {} - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &path_symbol_; - ar &path_elements_; - } -}; - -/** - * @brief Filter whose Pull returns true only when the given expression - * evaluates into true. - * - * The given expression is assumed to return either NULL (treated as false) or - * a - * boolean value. - */ -class Filter : public LogicalOperator { - public: - Filter(const std::shared_ptr &input_, - Expression *expression_); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - Expression *expression_; - - Filter() {} - - class FilterCursor : public Cursor { - public: - FilterCursor(const Filter &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Filter &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointer(ar, expression_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointer(ar, expression_); - } -}; - -/** - * @brief A logical operator that places an arbitrary number - * if named expressions on the frame (the logical operator - * for the RETURN clause). - * - * Supports optional input. When the input is provided, - * it is Pulled from and the Produce succeeds once for - * every input Pull (typically a MATCH/RETURN query). - * When the input is not provided (typically a standalone - * RETURN clause) the Produce's pull succeeds exactly once. - */ -class Produce : public LogicalOperator { - public: - Produce(const std::shared_ptr &input, - const std::vector &named_expressions); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - const std::vector &named_expressions(); - - private: - std::shared_ptr input_; - std::vector named_expressions_; - - class ProduceCursor : public Cursor { - public: - ProduceCursor(const Produce &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Produce &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - }; - - Produce() {} - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointers(ar, named_expressions_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointers(ar, named_expressions_); - } -}; - -/** - * @brief Operator for deleting vertices and edges. - * - * Has a flag for using DETACH DELETE when deleting - * vertices. - */ -class Delete : public LogicalOperator { - public: - Delete(const std::shared_ptr &input_, - const std::vector &expressions, bool detach_); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - std::vector expressions_; - // if the vertex should be detached before deletion - // if not detached, and has connections, an error is raised - // ignored when deleting edges - bool detach_; - - Delete() {} - - class DeleteCursor : public Cursor { - public: - DeleteCursor(const Delete &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Delete &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointers(ar, expressions_); - ar &detach_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointers(ar, expressions_); - ar &detach_; - } -}; - -/** - * @brief Logical Op for setting a single property on a single vertex or edge. - * - * The property value is an expression that must evaluate to some type that - * can be stored (a TypedValue that can be converted to PropertyValue). - */ -class SetProperty : public LogicalOperator { - public: - SetProperty(const std::shared_ptr &input, - PropertyLookup *lhs, Expression *rhs); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - PropertyLookup *lhs_; - Expression *rhs_; - - SetProperty() {} - - class SetPropertyCursor : public Cursor { - public: - SetPropertyCursor(const SetProperty &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const SetProperty &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointer(ar, lhs_); - SavePointer(ar, rhs_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointer(ar, lhs_); - LoadPointer(ar, rhs_); - } -}; - -/** - * @brief Logical op for setting the whole properties set on a vertex or an - * edge. - * - * The value being set is an expression that must evaluate to a vertex, edge - * or - * map (literal or parameter). - * - * Supports setting (replacing the whole properties set with another) and - * updating. - */ -class SetProperties : public LogicalOperator { - public: - /** - * @brief Defines how setting the properties works. - * - * @c UPDATE means that the current property set is augmented with - * additional - * ones (existing props of the same name are replaced), while @c REPLACE - * means - * that the old props are discarded and replaced with new ones. - */ - enum class Op { UPDATE, REPLACE }; - - SetProperties(const std::shared_ptr &input, - Symbol input_symbol, Expression *rhs, Op op); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - Symbol input_symbol_; - Expression *rhs_; - Op op_; - - SetProperties() {} - - class SetPropertiesCursor : public Cursor { - public: - SetPropertiesCursor(const SetProperties &self, - database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const SetProperties &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - - /** Helper function that sets the given values on either - * a VertexRecord or an EdgeRecord. - * @tparam TRecordAccessor Either RecordAccessor or - * RecordAccessor - */ - template - void Set(TRecordAccessor &record, const TypedValue &rhs) const; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &input_symbol_; - SavePointer(ar, rhs_); - ar &op_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &input_symbol_; - LoadPointer(ar, rhs_); - ar &op_; - } -}; - -/** - * @brief Logical operator for setting an arbitrary number of labels on a - * Vertex. - * - * It does NOT remove labels that are already set on that Vertex. - */ -class SetLabels : public LogicalOperator { - public: - SetLabels(const std::shared_ptr &input, Symbol input_symbol, - const std::vector &labels); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - Symbol input_symbol_; - std::vector labels_; - - SetLabels() {} - - class SetLabelsCursor : public Cursor { - public: - SetLabelsCursor(const SetLabels &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const SetLabels &self_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &input_symbol_; - ar &labels_; - } -}; - -/** - * @brief Logical op for removing a property from an - * edge or a vertex. - */ -class RemoveProperty : public LogicalOperator { - public: - RemoveProperty(const std::shared_ptr &input, - PropertyLookup *lhs); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - PropertyLookup *lhs_; - - RemoveProperty() {} - - class RemovePropertyCursor : public Cursor { - public: - RemovePropertyCursor(const RemoveProperty &self, - database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const RemoveProperty &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointer(ar, lhs_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointer(ar, lhs_); - } -}; - -/** - * @brief Logical operator for removing an arbitrary number of - * labels on a Vertex. - * - * If a label does not exist on a Vertex, nothing happens. - */ -class RemoveLabels : public LogicalOperator { - public: - RemoveLabels(const std::shared_ptr &input, - Symbol input_symbol, const std::vector &labels); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - Symbol input_symbol_; - std::vector labels_; - - RemoveLabels() {} - - class RemoveLabelsCursor : public Cursor { - public: - RemoveLabelsCursor(const RemoveLabels &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const RemoveLabels &self_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &input_symbol_; - ar &labels_; - } -}; - -/** - * Filter whose Pull returns true only when the given - * expand_symbol frame value (the latest expansion) is not - * equal to any of the previous_symbols frame values. - * - * Used for implementing [iso|cypher]morphism. - * Isomorphism is vertex-uniqueness. It means that - * two different vertices in a pattern can not map to the - * same data vertex. For example, if the database - * contains one vertex with a recursive relationship, - * then the query - * MATCH ()-[]->() combined with vertex uniqueness - * yields no results (no uniqueness yields one). - * Cyphermorphism is edge-uniqueness (the above - * explanation applies). By default Neo4j uses - * Cyphermorphism (that's where the name stems from, - * it is not a valid graph-theory term). - * - * Works for both Edge and Vertex uniqueness checks - * (provide the accessor type as a template argument). - * Supports variable-length-edges (uniqueness comparisons - * between edges and an edge lists). - */ -template -class ExpandUniquenessFilter : public LogicalOperator { - public: - ExpandUniquenessFilter(const std::shared_ptr &input, - Symbol expand_symbol, - const std::vector &previous_symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - Symbol expand_symbol_; - std::vector previous_symbols_; - - ExpandUniquenessFilter() {} - - class ExpandUniquenessFilterCursor : public Cursor { - public: - ExpandUniquenessFilterCursor(const ExpandUniquenessFilter &self, - database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const ExpandUniquenessFilter &self_; - const std::unique_ptr input_cursor_; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &expand_symbol_; - ar &previous_symbols_; - } -}; - -/** @brief Pulls everything from the input before passing it through. - * Optionally advances the command after accumulation and before emitting. - * - * On the first Pull from this Op's Cursor the input Cursor will be - * Pulled until it is empty. The results will be accumulated in the - * temporary cache. Once the input Cursor is empty, this Op's Cursor - * will start returning cached stuff from it's Pull. - * - * This technique is used for ensuring all the operations from the - * previous LogicalOp have been performed before exposing data - * to the next. A typical use-case is the `MATCH - SET - RETURN` - * query in which every SET iteration must be performed before - * RETURN starts iterating (see Memgraph Wiki for detailed reasoning). - * - * IMPORTANT: This Op does not cache all the results but only those - * elements from the frame whose symbols (frame positions) it was given. - * All other frame positions will contain undefined junk after this - * op has executed, and should not be used. - * - * This op can also advance the command after the accumulation and - * before emitting. If the command gets advanced, every value that - * has been cached will be reconstructed before Pull returns. - * - * @param input Input @c LogicalOperator. - * @param symbols A vector of Symbols that need to be accumulated - * and exposed to the next op. - */ -class Accumulate : public LogicalOperator { - public: - Accumulate(const std::shared_ptr &input, - const std::vector &symbols, bool advance_command = false); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - const auto &symbols() const { return symbols_; }; - auto advance_command() const { return advance_command_; } - - private: - std::shared_ptr input_; - std::vector symbols_; - bool advance_command_; - - Accumulate() {} - - class AccumulateCursor : public Cursor { - public: - AccumulateCursor(const Accumulate &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Accumulate &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - std::vector> cache_; - decltype(cache_.begin()) cache_it_ = cache_.begin(); - bool pulled_all_input_{false}; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &symbols_; - ar &advance_command_; - } -}; - -/** - * Custom equality function for a vector of typed values. - * Used in unordered_maps in Aggregate and Distinct operators. - */ -struct TypedValueVectorEqual { - bool operator()(const std::vector &left, - const std::vector &right) const; -}; - -/** @brief Performs an arbitrary number of aggregations of data - * from the given input grouped by the given criteria. - * - * Aggregations are defined by triples that define - * (input data expression, type of aggregation, output symbol). - * Input data is grouped based on the given set of named - * expressions. Grouping is done on unique values. - * - * IMPORTANT: - * Ops taking their input from an aggregation are only - * allowed to use frame values that are either aggregation - * outputs or group-by named-expressions. All other frame - * elements are in an undefined state after aggregation. - */ -class Aggregate : public LogicalOperator { - public: - /** @brief An aggregation element, contains: - * (input data expression, key expression - only used in COLLECT_MAP, type of - * aggregation, output symbol). - */ - struct Element { - Expression *value; - Expression *key; - Aggregation::Op op; - Symbol output_sym; - - private: - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - SavePointer(ar, value); - SavePointer(ar, key); - ar &op; - ar &output_sym; - } - - template - void load(TArchive &ar, const unsigned int) { - LoadPointer(ar, value); - LoadPointer(ar, key); - ar &op; - ar &output_sym; - } - }; - - Aggregate(const std::shared_ptr &input, - const std::vector &aggregations, - const std::vector &group_by, - const std::vector &remember); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - const auto &aggregations() const { return aggregations_; } - const auto &group_by() const { return group_by_; } - const auto &remember() const { return remember_; } - - private: - std::shared_ptr input_; - std::vector aggregations_; - std::vector group_by_; - std::vector remember_; - - Aggregate() {} - - class AggregateCursor : public Cursor { - public: - AggregateCursor(const Aggregate &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - // Data structure for a single aggregation cache. - // does NOT include the group-by values since those - // are a key in the aggregation map. - // The vectors in an AggregationValue contain one element - // for each aggregation in this LogicalOp. - struct AggregationValue { - // how many input rows has been aggregated in respective - // values_ element so far - std::vector counts_; - // aggregated values. Initially Null (until at least one - // input row with a valid value gets processed) - std::vector values_; - // remember values. - std::vector remember_; - }; - - const Aggregate &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - // storage for aggregated data - // map key is the vector of group-by values - // map value is an AggregationValue struct - std::unordered_map< - std::vector, AggregationValue, - // use FNV collection hashing specialized for a vector of TypedValues - utils::FnvCollection, TypedValue, - TypedValue::Hash>, - // custom equality - TypedValueVectorEqual> - aggregation_; - // iterator over the accumulated cache - decltype(aggregation_.begin()) aggregation_it_ = aggregation_.begin(); - // this LogicalOp pulls all from the input on it's first pull - // this switch tracks if this has been performed - bool pulled_all_input_{false}; - - /** - * Pulls from the input operator until exhausted and aggregates the - * results. If the input operator is not provided, a single call - * to ProcessOne is issued. - * - * Accumulation automatically groups the results so that `aggregation_` - * cache cardinality depends on number of - * aggregation results, and not on the number of inputs. - */ - void ProcessAll(Frame &, Context &); - - /** - * Performs a single accumulation. - */ - void ProcessOne(Frame &frame, const SymbolTable &symbolTable, - ExpressionEvaluator &evaluator); - - /** Ensures the new AggregationValue has been initialized. This means - * that the value vectors are filled with an appropriate number of Nulls, - * counts are set to 0 and remember values are remembered. - */ - void EnsureInitialized(Frame &frame, AggregationValue &agg_value) const; - - /** Updates the given AggregationValue with new data. Assumes that - * the AggregationValue has been initialized */ - void Update(Frame &frame, const SymbolTable &symbol_table, - ExpressionEvaluator &evaluator, AggregationValue &agg_value); - - /** Checks if the given TypedValue is legal in MIN and MAX. If not - * an appropriate exception is thrown. */ - void EnsureOkForMinMax(const TypedValue &value) const; - - /** Checks if the given TypedValue is legal in AVG and SUM. If not - * an appropriate exception is thrown. */ - void EnsureOkForAvgSum(const TypedValue &value) const; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &aggregations_; - SavePointers(ar, group_by_); - ar &remember_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &aggregations_; - LoadPointers(ar, group_by_); - ar &remember_; - } -}; - -/** @brief Skips a number of Pulls from the input op. - * - * The given expression determines how many Pulls from the input - * should be skipped (ignored). - * All other successful Pulls from the - * input are simply passed through. - * - * The given expression is evaluated after the first Pull from - * the input, and only once. Neo does not allow this expression - * to contain identifiers, and neither does Memgraph, but this - * operator's implementation does not expect this. - */ -class Skip : public LogicalOperator { - public: - Skip(const std::shared_ptr &input, Expression *expression); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - Expression *expression_; - - Skip() {} - - class SkipCursor : public Cursor { - public: - SkipCursor(const Skip &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Skip &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - // init to_skip_ to -1, indicating - // that it's still unknown (input has not been Pulled yet) - int to_skip_{-1}; - int skipped_{0}; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointer(ar, expression_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointer(ar, expression_); - } -}; - -/** @brief Limits the number of Pulls from the input op. - * - * The given expression determines how many - * input Pulls should be passed through. The input is not - * Pulled once this limit is reached. Note that this has - * implications: the out-of-bounds input Pulls are never - * evaluated. - * - * The limit expression must NOT use anything from the - * Frame. It is evaluated before the first Pull from the - * input. This is consistent with Neo (they don't allow - * identifiers in limit expressions), and it's necessary - * when limit evaluates to 0 (because 0 Pulls from the - * input should be performed). - */ -class Limit : public LogicalOperator { - public: - Limit(const std::shared_ptr &input, Expression *expression); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - Expression *expression_; - - Limit() {} - - class LimitCursor : public Cursor { - public: - LimitCursor(const Limit &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Limit &self_; - database::GraphDbAccessor &db_; - std::unique_ptr input_cursor_; - // init limit_ to -1, indicating - // that it's still unknown (Cursor has not been Pulled yet) - int limit_{-1}; - int pulled_{0}; - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointer(ar, expression_); - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointer(ar, expression_); - } -}; - -/** @brief Logical operator for ordering (sorting) results. - * - * Sorts the input rows based on an arbitrary number of - * Expressions. Ascending or descending ordering can be chosen - * for each independently (not providing enough orderings - * results in a runtime error). - * - * For each row an arbitrary number of Frame elements can be - * remembered. Only these elements (defined by their Symbols) - * are valid for usage after the OrderBy operator. - */ -class OrderBy : public LogicalOperator { - public: - OrderBy(const std::shared_ptr &input, - const std::vector> &order_by, - const std::vector &output_symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - const auto &order_by() const { return order_by_; } - const auto &compare() const { return compare_; } - const auto &output_symbols() const { return output_symbols_; } - - private: - std::shared_ptr input_; - TypedValueVectorCompare compare_; - std::vector order_by_; - std::vector output_symbols_; - - OrderBy() {} - - class OrderByCursor : public Cursor { - public: - OrderByCursor(const OrderBy &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const OrderBy &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - bool did_pull_all_{false}; - // a cache of elements pulled from the input - // first pair element is the order-by vector - // second pair is the remember vector - // the cache is filled and sorted (only on first pair elem) on first Pull - std::vector, std::vector>> - cache_; - // iterator over the cache_, maintains state between Pulls - decltype(cache_.begin()) cache_it_ = cache_.begin(); - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &compare_; - SavePointers(ar, order_by_); - ar &output_symbols_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &compare_; - LoadPointers(ar, order_by_); - ar &output_symbols_; - } -}; - -/** - * Merge operator. For every sucessful Pull from the - * input operator a Pull from the merge_match is attempted. All - * successfull Pulls from the merge_match are passed on as output. - * If merge_match Pull does not yield any elements, a single Pull - * from the merge_create op is performed. - * - * The input logical op is optional. If false (nullptr) - * it will be replaced by a Once op. - * - * For an argumentation of this implementation see the wiki - * documentation. - */ -class Merge : public LogicalOperator { - public: - Merge(const std::shared_ptr &input, - const std::shared_ptr &merge_match, - const std::shared_ptr &merge_create); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - // TODO: Consider whether we want to treat Merge as having single input. It - // makes sense that we do, because other branches are executed depending on - // the input. - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - auto merge_match() const { return merge_match_; } - auto merge_create() const { return merge_create_; } - - private: - std::shared_ptr input_; - std::shared_ptr merge_match_; - std::shared_ptr merge_create_; - - Merge() {} - - class MergeCursor : public Cursor { - public: - MergeCursor(const Merge &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const std::unique_ptr input_cursor_; - const std::unique_ptr merge_match_cursor_; - const std::unique_ptr merge_create_cursor_; - - // indicates if the next Pull from this cursor - // should perform a pull from input_cursor_ - // this is true when: - // - first Pulling from this cursor - // - previous Pull from this cursor exhausted the merge_match_cursor - bool pull_input_{true}; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &merge_match_; - ar &merge_create_; - } -}; - -/** - * Optional operator. Used for optional match. For every - * successful Pull from the input branch a Pull from the optional - * branch is attempted (and Pulled from till exhausted). If zero - * Pulls succeed from the optional branch, the Optional operator - * sets the optional symbols to TypedValue::Null on the Frame - * and returns true, once. - */ -class Optional : public LogicalOperator { - public: - Optional(const std::shared_ptr &input, - const std::shared_ptr &optional, - const std::vector &optional_symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - // TODO: Consider whether we want to treat Optional as having single input. - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - auto optional() const { return optional_; } - const auto &optional_symbols() const { return optional_symbols_; } - - private: - std::shared_ptr input_; - std::shared_ptr optional_; - std::vector optional_symbols_; - - Optional() {} - - class OptionalCursor : public Cursor { - public: - OptionalCursor(const Optional &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Optional &self_; - const std::unique_ptr input_cursor_; - const std::unique_ptr optional_cursor_; - // indicates if the next Pull from this cursor should - // perform a Pull from the input_cursor_ - // this is true when: - // - first pulling from this Cursor - // - previous Pull from this cursor exhausted the optional_cursor_ - bool pull_input_{true}; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &optional_; - ar &optional_symbols_; - } -}; - -/** - * Takes a list TypedValue as it's input and yields each - * element as it's output. - * - * Input is optional (unwind can be the first clause in a query). - */ -class Unwind : public LogicalOperator { - public: - Unwind(const std::shared_ptr &input, - Expression *input_expression_, Symbol output_symbol); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - Expression *input_expression() const { return input_expression_; } - - private: - std::shared_ptr input_; - Expression *input_expression_; - Symbol output_symbol_; - - Unwind() {} - - class UnwindCursor : public Cursor { - public: - UnwindCursor(const Unwind &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Unwind &self_; - database::GraphDbAccessor &db_; - const std::unique_ptr input_cursor_; - // typed values we are unwinding and yielding - std::vector input_value_; - // current position in input_value_ - std::vector::iterator input_value_it_ = input_value_.end(); - }; - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - SavePointer(ar, input_expression_); - ar &output_symbol_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - LoadPointer(ar, input_expression_); - ar &output_symbol_; - } -}; - -/** - * Ensures that only distinct rows are yielded. - * This implementation accepts a vector of Symbols - * which define a row. Only those Symbols are valid - * for use in operators following Distinct. - * - * This implementation maintains input ordering. - */ -class Distinct : public LogicalOperator { - public: - Distinct(const std::shared_ptr &input, - const std::vector &value_symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - private: - std::shared_ptr input_; - std::vector value_symbols_; - - Distinct() {} - - class DistinctCursor : public Cursor { - public: - DistinctCursor(const Distinct &self, database::GraphDbAccessor &db); - - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Distinct &self_; - const std::unique_ptr input_cursor_; - // a set of already seen rows - std::unordered_set< - std::vector, - // use FNV collection hashing specialized for a vector of TypedValues - utils::FnvCollection, TypedValue, - TypedValue::Hash>, - TypedValueVectorEqual> - seen_rows_; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &value_symbols_; - } -}; - -/** - * Creates an index for a combination of label and a property. - * - * This operator takes no input and it shouldn't serve as an input to any - * operator. Pulling from the cursor of this operator will create an index in - * the database for the vertices which have the given label and property. In - * case the index already exists, nothing happens. - */ -class CreateIndex : public LogicalOperator { - public: - CreateIndex(storage::Label label, storage::Property property); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override { - return {}; - } - - bool HasSingleInput() const override; - std::shared_ptr input() const override; - void set_input(std::shared_ptr) override; - - auto label() const { return label_; } - auto property() const { return property_; } - - private: - storage::Label label_; - storage::Property property_; - - CreateIndex() {} - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &label_; - ar &property_; - } -}; - -/** - * A logical operator that applies UNION operator on inputs and places the - * result on the frame. - * - * This operator takes two inputs, a vector of symbols for the result, and - * vectors of symbols used by each of the inputs. - */ -class Union : public LogicalOperator { - public: - Union(const std::shared_ptr &left_op, - const std::shared_ptr &right_op, - const std::vector &union_symbols, - const std::vector &left_symbols, - const std::vector &right_symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override; - std::shared_ptr input() const override; - void set_input(std::shared_ptr) override; - - private: - std::shared_ptr left_op_, right_op_; - std::vector union_symbols_, left_symbols_, right_symbols_; - - Union() {} - - class UnionCursor : public Cursor { - public: - UnionCursor(const Union &self, database::GraphDbAccessor &db); - bool Pull(Frame &, Context &) override; - void Reset() override; - - private: - const Union &self_; - const std::unique_ptr left_cursor_, right_cursor_; - }; - - friend class boost::serialization::access; - - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &left_op_; - ar &right_op_; - ar &union_symbols_; - ar &left_symbols_; - ar &right_symbols_; - } -}; - -/** - * An operator in distributed Memgraph that yields both local and remote (from - * other workers) frames. Obtaining remote frames is done through RPC calls to - * `distributed::ProduceRpcServer`s running on all the workers. - * - * This operator aims to yield results as fast as possible and lose minimal - * time on data transfer. It gives no guarantees on result order. - */ -class PullRemote : public LogicalOperator { - public: - PullRemote(const std::shared_ptr &input, int64_t plan_id, - const std::vector &symbols) - : input_(input), plan_id_(plan_id), symbols_(symbols) {} - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - const auto &symbols() const { return symbols_; } - auto plan_id() const { return plan_id_; } - - private: - std::shared_ptr input_; - int64_t plan_id_ = 0; - std::vector symbols_; - - PullRemote() {} - - friend class boost::serialization::access; - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &plan_id_; - ar &symbols_; - } -}; - -/** - * Operator used to synchronize stages of plan execution between the master and - * all the workers. Synchronization is necessary in queries that update that - * graph state because updates (as well as creations and deletions) are deferred - * to avoid multithreaded modification of graph element data (as it's not - * thread-safe). - * - * Logic of the synchronize operator is: - * - * 1. If there is a Pull, tell all the workers to pull on that plan and - * accumulate results without sending them to the master. This is async. - * 2. Accumulate local results, in parallel with 1. getting executed on workers. - * 3. Wait till the master and all the workers are done accumulating. - * 4. Advance the command, if necessary. - * 5. Tell all the workers to apply their updates. This is async. - * 6. Apply local updates, in parallel with 5. on the workers. - * 7. Notify workers that the command has advanced, if necessary. - * 8. Yield all the results, first local, then from Pull if available. - */ -class Synchronize : public LogicalOperator { - public: - Synchronize(const std::shared_ptr &input, - const std::shared_ptr &pull_remote, - bool advance_command) - : input_(input), - pull_remote_(pull_remote), - advance_command_(advance_command) {} - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - std::vector OutputSymbols( - const SymbolTable &symbol_table) const override { - return input_->OutputSymbols(symbol_table); - } - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - auto pull_remote() const { return pull_remote_; } - auto advance_command() const { return advance_command_; } - - private: - std::shared_ptr input_; - std::shared_ptr pull_remote_; - bool advance_command_ = false; - - Synchronize() {} - - friend class boost::serialization::access; - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &pull_remote_; - ar &advance_command_; - } -}; - -/** Operator for producing a Cartesian product from 2 input branches */ -class Cartesian : public LogicalOperator { - public: - /** Construct the operator with left input branch and right input branch. */ - Cartesian(const std::shared_ptr &left_op, - const std::vector &left_symbols, - const std::shared_ptr &right_op, - const std::vector &right_symbols) - : left_op_(left_op), - left_symbols_(left_symbols), - right_op_(right_op), - right_symbols_(right_symbols) {} - - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - std::vector ModifiedSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override; - std::shared_ptr input() const override; - void set_input(std::shared_ptr) override; - - auto left_op() const { return left_op_; } - auto left_symbols() const { return left_symbols_; } - auto right_op() const { return right_op_; } - auto right_symbols() const { return right_symbols_; } - - private: - std::shared_ptr left_op_; - std::vector left_symbols_; - std::shared_ptr right_op_; - std::vector right_symbols_; - - Cartesian() {} - - friend class boost::serialization::access; - template - void serialize(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &left_op_; - ar &left_symbols_; - ar &right_op_; - ar &right_symbols_; - } -}; - -/** - * Operator that merges distributed OrderBy operators. - * - * Instead of using a regular OrderBy on master (which would collect all remote - * results and order them), we can have each worker do an OrderBy locally and - * have the master rely on the fact that the results are ordered and merge them - * by having only one result from each worker. - */ -class PullRemoteOrderBy : public LogicalOperator { - public: - PullRemoteOrderBy( - const std::shared_ptr &input, int64_t plan_id, - const std::vector> &order_by, - const std::vector &symbols); - bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; - std::unique_ptr MakeCursor( - database::GraphDbAccessor &db) const override; - - std::vector ModifiedSymbols(const SymbolTable &) const override; - std::vector OutputSymbols(const SymbolTable &) const override; - - bool HasSingleInput() const override { return true; } - std::shared_ptr input() const override { return input_; } - void set_input(std::shared_ptr input) override { - input_ = input; - } - - auto plan_id() const { return plan_id_; } - const auto &symbols() const { return symbols_; } - auto order_by() const { return order_by_; } - const auto &compare() const { return compare_; } - - private: - std::shared_ptr input_; - int64_t plan_id_ = 0; - std::vector symbols_; - std::vector order_by_; - TypedValueVectorCompare compare_; - - PullRemoteOrderBy() {} - - friend class boost::serialization::access; - - BOOST_SERIALIZATION_SPLIT_MEMBER(); - - template - void save(TArchive &ar, const unsigned int) const { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &plan_id_; - ar &symbols_; - SavePointers(ar, order_by_); - ar &compare_; - } - - template - void load(TArchive &ar, const unsigned int) { - ar &boost::serialization::base_object(*this); - ar &input_; - ar &plan_id_; - ar &symbols_; - LoadPointers(ar, order_by_); - ar &compare_; - } -}; - -} // namespace plan -} // namespace query - -BOOST_CLASS_EXPORT_KEY(query::plan::Once); -BOOST_CLASS_EXPORT_KEY(query::plan::CreateNode); -BOOST_CLASS_EXPORT_KEY(query::plan::CreateExpand); -BOOST_CLASS_EXPORT_KEY(query::plan::ScanAll); -BOOST_CLASS_EXPORT_KEY(query::plan::ScanAllByLabel); -BOOST_CLASS_EXPORT_KEY(query::plan::ScanAllByLabelPropertyRange); -BOOST_CLASS_EXPORT_KEY(query::plan::ScanAllByLabelPropertyValue); -BOOST_CLASS_EXPORT_KEY(query::plan::Expand); -BOOST_CLASS_EXPORT_KEY(query::plan::ExpandVariable); -BOOST_CLASS_EXPORT_KEY(query::plan::Filter); -BOOST_CLASS_EXPORT_KEY(query::plan::Produce); -BOOST_CLASS_EXPORT_KEY(query::plan::ConstructNamedPath); -BOOST_CLASS_EXPORT_KEY(query::plan::Delete); -BOOST_CLASS_EXPORT_KEY(query::plan::SetProperty); -BOOST_CLASS_EXPORT_KEY(query::plan::SetProperties); -BOOST_CLASS_EXPORT_KEY(query::plan::SetLabels); -BOOST_CLASS_EXPORT_KEY(query::plan::RemoveProperty); -BOOST_CLASS_EXPORT_KEY(query::plan::RemoveLabels); -BOOST_CLASS_EXPORT_KEY(query::plan::ExpandUniquenessFilter); -BOOST_CLASS_EXPORT_KEY(query::plan::ExpandUniquenessFilter); -BOOST_CLASS_EXPORT_KEY(query::plan::Accumulate); -BOOST_CLASS_EXPORT_KEY(query::plan::Aggregate); -BOOST_CLASS_EXPORT_KEY(query::plan::Skip); -BOOST_CLASS_EXPORT_KEY(query::plan::Limit); -BOOST_CLASS_EXPORT_KEY(query::plan::OrderBy); -BOOST_CLASS_EXPORT_KEY(query::plan::Merge); -BOOST_CLASS_EXPORT_KEY(query::plan::Optional); -BOOST_CLASS_EXPORT_KEY(query::plan::Unwind); -BOOST_CLASS_EXPORT_KEY(query::plan::Distinct); -BOOST_CLASS_EXPORT_KEY(query::plan::CreateIndex); -BOOST_CLASS_EXPORT_KEY(query::plan::Union); -BOOST_CLASS_EXPORT_KEY(query::plan::PullRemote); -BOOST_CLASS_EXPORT_KEY(query::plan::Synchronize); -BOOST_CLASS_EXPORT_KEY(query::plan::Cartesian); -BOOST_CLASS_EXPORT_KEY(query::plan::PullRemoteOrderBy); diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp new file mode 100644 index 000000000..656bc15b4 --- /dev/null +++ b/src/query/plan/operator.lcp @@ -0,0 +1,2147 @@ +#>cpp +/** @file */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include "boost/serialization/base_object.hpp" +#include "boost/serialization/export.hpp" +#include "boost/serialization/serialization.hpp" +#include "boost/serialization/shared_ptr.hpp" +#include "boost/serialization/unique_ptr.hpp" + +#include "distributed/pull_produce_rpc_messages.hpp" +#include "query/common.hpp" +#include "query/frontend/ast/ast.hpp" +#include "query/frontend/semantic/symbol.hpp" +#include "query/typed_value.hpp" +#include "storage/types.hpp" +#include "utils/bound.hpp" +#include "utils/future.hpp" +#include "utils/hashing/fnv.hpp" +#include "utils/visitor.hpp" + +namespace database { +class GraphDbAccessor; +} + +namespace query { + +class Context; +class ExpressionEvaluator; +class Frame; +class SymbolTable; + +namespace plan { + +/** @brief Base class for iteration cursors of @c LogicalOperator classes. + * + * Each @c LogicalOperator must produce a concrete @c Cursor, which provides + * the iteration mechanism. + */ +class Cursor { + public: + /** @brief Run an iteration of a @c LogicalOperator. + * + * Since operators may be chained, the iteration may pull results from + * multiple operators. + * + * @param Frame May be read from or written to while performing the + * iteration. + * @param Context Used to get the position of symbols in frame and other + * information. + */ + virtual bool Pull(Frame &, Context &) = 0; + + /** + * Resets the Cursor to it's initial state. + */ + virtual void Reset() = 0; + + virtual ~Cursor() {} +}; + +class Once; +class CreateNode; +class CreateExpand; +class ScanAll; +class ScanAllByLabel; +class ScanAllByLabelPropertyRange; +class ScanAllByLabelPropertyValue; +class Expand; +class ExpandVariable; +class ConstructNamedPath; +class Filter; +class Produce; +class Delete; +class SetProperty; +class SetProperties; +class SetLabels; +class RemoveProperty; +class RemoveLabels; +template +class ExpandUniquenessFilter; +class Accumulate; +class Aggregate; +class Skip; +class Limit; +class OrderBy; +class Merge; +class Optional; +class Unwind; +class Distinct; +class CreateIndex; +class Union; +class PullRemote; +class Synchronize; +class Cartesian; +class PullRemoteOrderBy; + +using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor< + Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel, + ScanAllByLabelPropertyRange, ScanAllByLabelPropertyValue, Expand, + ExpandVariable, ConstructNamedPath, Filter, Produce, Delete, SetProperty, + SetProperties, SetLabels, RemoveProperty, RemoveLabels, + ExpandUniquenessFilter, + ExpandUniquenessFilter, Accumulate, Aggregate, Skip, Limit, + OrderBy, Merge, Optional, Unwind, Distinct, Union, PullRemote, Synchronize, + Cartesian, PullRemoteOrderBy>; + +using LogicalOperatorLeafVisitor = ::utils::LeafVisitor; + +/** + * @brief Base class for hierarhical visitors of @c LogicalOperator class + * hierarchy. + */ +class HierarchicalLogicalOperatorVisitor + : public LogicalOperatorCompositeVisitor, + public LogicalOperatorLeafVisitor { + public: + using LogicalOperatorCompositeVisitor::PostVisit; + using LogicalOperatorCompositeVisitor::PreVisit; + using LogicalOperatorLeafVisitor::Visit; + using typename LogicalOperatorLeafVisitor::ReturnType; +}; + +/** Base class for logical operators. + * + * Each operator describes an operation, which is to be performed on the + * database. Operators are iterated over using a @c Cursor. Various operators + * can serve as inputs to others and thus a sequence of operations is formed. + */ +class LogicalOperator + : public ::utils::Visitable { + public: + virtual ~LogicalOperator() {} + + /** Constructs a @c Cursor which is used to run this operator. + * + * @param database::GraphDbAccessor Used to perform operations on the + * database. + */ + virtual std::unique_ptr MakeCursor( + database::GraphDbAccessor &) const = 0; + + /** Return @c Symbol vector where the query results will be stored. + * + * Currently, outputs symbols are generated in @c Produce and @c Union + * operators. @c Skip, @c Limit, @c OrderBy and @c Distinct propagate the + * symbols from @c Produce (if it exists as input operator). + * + * @param SymbolTable used to find symbols for expressions. + * @return std::vector used for results. + */ + virtual std::vector OutputSymbols(const SymbolTable &) const { + return std::vector(); + } + + /** + * Symbol vector whose values are modified by this operator sub-tree. + * + * This is different than @c OutputSymbols, because it returns all of the + * modified symbols, including those that may not be returned as the + * result of the query. Note that the modified symbols will not contain + * those that should not be read after the operator is processed. + * + * For example, `MATCH (n)-[e]-(m) RETURN n AS l` will generate `ScanAll (n) > + * Expand (e, m) > Produce (l)`. The modified symbols on Produce sub-tree will + * be `l`, the same as output symbols, because it isn't valid to read `n`, `e` + * nor `m` after Produce. On the other hand, modified symbols from Expand + * contain `e` and `m`, as well as `n`, while output symbols are empty. + * Modified symbols from ScanAll contain only `n`, while output symbols are + * also empty. + */ + virtual std::vector ModifiedSymbols(const SymbolTable &) const = 0; + + /** + * Returns true if the operator takes only one input operator. + * NOTE: When this method returns true, you may use `input` and `set_input` + * methods. + */ + virtual bool HasSingleInput() const = 0; + + /** + * Returns the input operator if it has any. + * NOTE: This should only be called if `HasSingleInput() == true`. + */ + virtual std::shared_ptr input() const = 0; + /** + * Set a different input on this operator. + * NOTE: This should only be called if `HasSingleInput() == true`. + */ + virtual void set_input(std::shared_ptr) = 0; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &, const unsigned int) {} +}; + +template +std::pair, AstTreeStorage> LoadPlan( + TArchive &ar) { + std::unique_ptr root; + ar >> root; + return {std::move(root), std::move(ar.template get_helper( + AstTreeStorage::kHelperId))}; +} +cpp<# + +(defun save-pointer (archive member-name) + #>cpp + SavePointer(${archive}, ${member-name}); + cpp<#) + +(defun load-pointer (archive member-name) + #>cpp + LoadPointer(${archive}, ${member-name}); + cpp<#) + +(defun save-pointers (archive member-name) + #>cpp + SavePointers(${archive}, ${member-name}); + cpp<#) + +(defun load-pointers (archive member-name) + #>cpp + LoadPointers(${archive}, ${member-name}); + cpp<#) + +(lcp:define-class once (logical-operator) + () + (:documentation + "A logical operator whose Cursor returns true on the first Pull +and false on every following Pull.") + (:public + #>cpp + DEFVISITABLE(HierarchicalLogicalOperatorVisitor); + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override { + return {}; + } + + bool HasSingleInput() const override; + std::shared_ptr input() const override; + void set_input(std::shared_ptr) override; + cpp<#) + (:private + #>cpp + class OnceCursor : public Cursor { + public: + OnceCursor() {} + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + bool did_pull_{false}; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class create-node (logical-operator) + ((input "std::shared_ptr") + (node-atom "NodeAtom *" :initval "nullptr" + :save-fun #'save-pointer :load-fun #'load-pointer) + (on-random-worker :bool :initval "false")) + (:documentation + "Operator for creating a node. + +This op is used both for creating a single node (`CREATE` statement without +a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or +`CREATE (), () ...`). + +@sa CreateExpand") + (:public + #>cpp + /** + * @param input Optional. If @c nullptr, then a single node will be + * created (a single successful @c Cursor::Pull from this op's @c Cursor). + * If a valid input, then a node will be created for each + * successful pull from the given input. + * @param node_atom @c NodeAtom with information on how to create a node. + * @param on_random_worker If the node should be created locally or on random + * worker. + */ + CreateNode(const std::shared_ptr &input, NodeAtom *node_atom, + bool on_random_worker); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + + auto on_random_worker() const { return on_random_worker_; } + void set_on_random_worker(bool v) { on_random_worker_ = v; } + cpp<#) + (:private + #>cpp + CreateNode() {} + + class CreateNodeCursor : public Cursor { + public: + CreateNodeCursor(const CreateNode &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const CreateNode &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class create-expand (logical-operator) + ( + ;; info on what's getting expanded + (node-atom "NodeAtom *" :save-fun #'save-pointer :load-fun #'load-pointer) + (edge-atom "EdgeAtom *" :save-fun #'save-pointer :load-fun #'load-pointer) + ;; the input op and the symbol under which the op's result + ;; can be found in the frame + (input "std::shared_ptr") + (input-symbol "Symbol") + (existing-node :bool :documentation + "if the given node atom refers to an existing node (either matched or created)")) + (:documentation + "Operator for creating edges and destination nodes. + +This operator extends already created nodes with an edge. If the other node +on the edge does not exist, it will be created. For example, in `MATCH (n) +CREATE (n) -[r:r]-> (n)` query, this operator will create just the edge `r`. +In `MATCH (n) CREATE (n) -[r:r]-> (m)` query, the operator will create both +the edge `r` and the node `m`. In case of `CREATE (n) -[r:r]-> (m)` the +first node `n` is created by @c CreateNode operator, while @c CreateExpand +will create the edge `r` and `m`. Similarly, multiple @c CreateExpand are +chained in cases when longer paths need creating. + +@sa CreateNode") + (:public + #>cpp + /** @brief Construct @c CreateExpand. + * + * @param node_atom @c NodeAtom at the end of the edge. Used to create a node, + * unless it refers to an existing one. + * @param edge_atom @c EdgeAtom with information for the edge to be created. + * @param input Optional. Previous @c LogicalOperator which will be pulled. + * For each successful @c Cursor::Pull, this operator will create an + * expansion. + * @param input_symbol @c Symbol for the node at the start of the edge. + * @param existing_node @c bool indicating whether the @c node_atom refers to + * an existing node. If @c false, the operator will also create the node. + */ + CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, + const std::shared_ptr &input, + Symbol input_symbol, bool existing_node); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + CreateExpand() {} + + class CreateExpandCursor : public Cursor { + public: + CreateExpandCursor(const CreateExpand &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const CreateExpand &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + + /** Gets the existing node (if existing_node_ == true), or creates a new + * node (on the given worker) and returns it. */ + VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context); + + /** + * Helper function for creating an edge and adding it + * to the frame. + * + * @param from Origin vertex of the edge. + * @param to Destination vertex of the edge. + * @param evaluator Expression evaluator for property value eval. + */ + void CreateEdge(VertexAccessor &from, VertexAccessor &to, Frame &frame, + const SymbolTable &symbol_table, + ExpressionEvaluator &evaluator); + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class scan-all (logical-operator) + ((input "std::shared_ptr" :scope :protected) + (output-symbol "Symbol" :reader t :scope :protected) + (graph-view "GraphView" :reader t :scope :protected + :documentation + "Controls which graph state is used to produce vertices. + +If @c GraphView::OLD, @c ScanAll will produce vertices visible in the previous +graph state, before modifications done by current transaction & command. With +@c GraphView::NEW, all vertices will be produced the current transaction sees +along with their modifications.")) + + (:documentation + "Operator which iterates over all the nodes currently in the database. +When given an input (optional), does a cartesian product. + +It accepts an optional input. If provided then this op scans all the nodes +currently in the database for each successful Pull from it's input, thereby +producing a cartesian product of input Pulls and database elements. + +ScanAll can either iterate over the previous graph state (state before +the current transacton+command) or over current state. This is controlled +with a constructor argument. + +@sa ScanAllByLabel +@sa ScanAllByLabelPropertyRange +@sa ScanAllByLabelPropertyValue") + (:public + #>cpp + ScanAll(const std::shared_ptr &input, Symbol output_symbol, + GraphView graph_view = GraphView::OLD); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:protected #>cpp ScanAll() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class scan-all-by-label (scan-all) + ((label "storage::Label" :reader t)) + (:documentation + "Behaves like @c ScanAll, but this operator produces only vertices with +given label. + +@sa ScanAll +@sa ScanAllByLabelPropertyRange +@sa ScanAllByLabelPropertyValue") + (:public + #>cpp + ScanAllByLabel(const std::shared_ptr &input, + Symbol output_symbol, storage::Label label, + GraphView graph_view = GraphView::OLD); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + cpp<#) + (:private #>cpp ScanAllByLabel() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class scan-all-by-label-property-range (scan-all) + ((label "storage::Label" :reader t) + (property "storage::Property" :reader t) + (lower-bound "std::experimental::optional" :reader t + :save-fun + #>cpp + auto save_bound = [&ar](auto &maybe_bound) { + if (!maybe_bound) { + ar & false; + return; + } + ar & true; + auto &bound = *maybe_bound; + ar &bound.type(); + SavePointer(ar, bound.value()); + }; + save_bound(lower_bound_); + cpp<# + :load-fun + #>cpp + auto load_bound = [&ar](auto &maybe_bound) { + bool has_bound = false; + ar &has_bound; + if (!has_bound) { + maybe_bound = std::experimental::nullopt; + return; + } + utils::BoundType type; + ar &type; + Expression *value; + LoadPointer(ar, value); + maybe_bound = std::experimental::make_optional(Bound(value, type)); + }; + load_bound(lower_bound_); + cpp<#) + (upper-bound "std::experimental::optional" :reader t + :save-fun #>cpp save_bound(upper_bound_); cpp<# + :load-fun #>cpp load_bound(upper_bound_); cpp<#)) + (:documentation + "Behaves like @c ScanAll, but produces only vertices with given label and +property value which is inside a range (inclusive or exlusive). + +@sa ScanAll +@sa ScanAllByLabel +@sa ScanAllByLabelPropertyValue") + (:public + #>cpp + /** Bound with expression which when evaluated produces the bound value. */ + using Bound = utils::Bound; + /** + * Constructs the operator for given label and property value in range + * (inclusive). + * + * Range bounds are optional, but only one bound can be left out. + * + * @param input Preceding operator which will serve as the input. + * @param output_symbol Symbol where the vertices will be stored. + * @param label Label which the vertex must have. + * @param property Property from which the value will be looked up from. + * @param lower_bound Optional lower @c Bound. + * @param upper_bound Optional upper @c Bound. + * @param graph_view GraphView used when obtaining vertices. + */ + ScanAllByLabelPropertyRange(const std::shared_ptr &input, + Symbol output_symbol, storage::Label label, + storage::Property property, + std::experimental::optional lower_bound, + std::experimental::optional upper_bound, + GraphView graph_view = GraphView::OLD); + + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + cpp<#) + (:private #>cpp ScanAllByLabelPropertyRange() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class scan-all-by-label-property-value (scan-all) + ((label "storage::Label" :reader t) + (property "storage::Property" :reader t) + (expression "Expression *" :reader t + :save-fun #'save-pointer :load-fun #'load-pointer)) + (:documentation + "Behaves like @c ScanAll, but produces only vertices with given label and +property value. + +@sa ScanAll +@sa ScanAllByLabel +@sa ScanAllByLabelPropertyRange") + (:public + #>cpp + /** + * Constructs the operator for given label and property value. + * + * @param input Preceding operator which will serve as the input. + * @param output_symbol Symbol where the vertices will be stored. + * @param label Label which the vertex must have. + * @param property Property from which the value will be looked up from. + * @param expression Expression producing the value of the vertex property. + * @param graph_view GraphView used when obtaining vertices. + */ + ScanAllByLabelPropertyValue(const std::shared_ptr &input, + Symbol output_symbol, storage::Label label, + storage::Property property, + Expression *expression, + GraphView graph_view = GraphView::OLD); + + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + cpp<#) + (:private #>cpp ScanAllByLabelPropertyValue() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class expand-common () + ( + ;; info on what's getting expanded + (node-symbol "Symbol" :reader t :scope :protected) + (edge-symbol "Symbol" :reader t :scope :protected) + (direction "EdgeAtom::Direction" :reader t :scope :protected) + (edge-types "std::vector" :reader t :scope :protected) + ;; the input op and the symbol under which the op's result + ;; can be found in the frame + (input "std::shared_ptr" :scope :protected) + (input-symbol "Symbol" :reader t :scope :protected) + (existing-node :bool :scope :protected :documentation + "If the given node atom refer to a symbol that has already + been expanded and should be just validated in the frame.") + (graph-view "GraphView" :scope :protected :documentation + "from which state the input node should get expanded")) + (:documentation + "Common functionality and data members of single-edge and variable-length +expansion") + (:public + #>cpp + /** + * Initializes common expansion parameters. + * + * Edge/Node existence are controlled with booleans. 'true' + * denotes that this expansion references an already + * Pulled node/edge, and should only be checked for equality + * during expansion. + * + * Expansion can be done from old or new state of the vertex + * the expansion originates from. This is controlled with a + * constructor argument. + * + * @param node_symbol Symbol pointing to the node to be expanded. This is + * where the new node will be stored. + * @param edge_symbol Symbol for the edges to be expanded. This is where + * a TypedValue containing a list of expanded edges will be stored. + * @param direction EdgeAtom::Direction determining the direction of edge + * expansion. The direction is relative to the starting vertex for each + * expansion. + * @param edge_types storage::EdgeType specifying which edges we + * want to expand. If empty, all edges are valid. If not empty, only edges + * with one of the given types are valid. + * @param input Optional LogicalOperator that preceeds this one. + * @param input_symbol Symbol that points to a VertexAccessor in the Frame + * that expansion should emanate from. + * @param existing_node If or not the node to be expanded is already present + * in the Frame and should just be checked for equality. + */ + ExpandCommon(Symbol node_symbol, Symbol edge_symbol, + EdgeAtom::Direction direction, + const std::vector &edge_types, + const std::shared_ptr &input, + Symbol input_symbol, bool existing_node, + GraphView graph_view = GraphView::AS_IS); + cpp<#) + (:protected + #>cpp + // types that we'll use for members in both subclasses + using InEdgeT = decltype(std::declval().in()); + using InEdgeIteratorT = decltype(std::declval().in().begin()); + using OutEdgeT = decltype(std::declval().out()); + using OutEdgeIteratorT = + decltype(std::declval().out().begin()); + + /** + * For a newly expanded node handles existence checking and + * frame placement. + * + * @return If or not the given new_node is a valid expansion. It is not + * valid only when matching and existing node and new_node does not match + * the old. + */ + bool HandleExistingNode(const VertexAccessor &new_node, Frame &frame) const; + + ExpandCommon() {} + cpp<#) + (:serialize :boost)) + +(lcp:define-class expand (logical-operator expand-common) + () + (:documentation + "Expansion operator. For a node existing in the frame it +expands one edge and one node and places them on the frame. + +This class does not handle node/edge filtering based on +properties, labels and edge types. However, it does handle +filtering on existing node / edge. + +Filtering on existing means that for a pattern that references +an already declared node or edge (for example in +MATCH (a) MATCH (a)--(b)), +only expansions that match defined equalities are successfully +pulled.") + (:public + #>cpp + /** + * Creates an expansion. All parameters are forwarded to @c ExpandCommon and + * are documented there. + */ + using ExpandCommon::ExpandCommon; + + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + + class ExpandCursor : public Cursor { + public: + ExpandCursor(const Expand &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + struct FutureExpand { + utils::Future> edge_to; + std::vector frame_elems; + }; + + const Expand &self_; + const std::unique_ptr input_cursor_; + database::GraphDbAccessor &db_; + + // The iterable over edges and the current edge iterator are referenced via + // optional because they can not be initialized in the constructor of + // this class. They are initialized once for each pull from the input. + std::experimental::optional in_edges_; + std::experimental::optional in_edges_it_; + std::experimental::optional out_edges_; + std::experimental::optional out_edges_it_; + // Stores the last frame before we yield the frame for future edge. It needs + // to be restored afterward. + std::vector last_frame_; + // Edges which are being asynchronously fetched from a remote worker. + // NOTE: This should be destructed first to ensure that no invalid + // references or pointers exists to other objects of this class. + std::vector future_expands_; + + bool InitEdges(Frame &, Context &); + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class expand-variable (logical-operator expand-common) + ((type "EdgeAtom::Type" :reader t) + (is-reverse :bool :documentation + "True if the path should be written as expanding from node_symbol to input_symbol.") + (lower-bound "Expression *" :save-fun #'save-pointer :load-fun #'load-pointer + :documentation "Optional lower bound of the variable length expansion, defaults are (1, inf)") + (upper-bound "Expression *" :save-fun #'save-pointer :load-fun #'load-pointer + :documentation "Optional upper bound of the variable length expansion, defaults are (1, inf)") + (filter-lambda "Lambda") + (weight-lambda "std::experimental::optional") + (total-weight "std::experimental::optional")) + (:documentation + "Variable-length expansion operator. For a node existing in +the frame it expands a variable number of edges and places them +(in a list-type TypedValue), as well as the final destination node, +on the frame. + +This class does not handle node/edge filtering based on +properties, labels and edge types. However, it does handle +filtering on existing node / edge. Additionally it handles's +edge-uniquess (cyphermorphism) because it's not feasable to do +later. + +Filtering on existing means that for a pattern that references +an already declared node or edge (for example in +MATCH (a) MATCH (a)--(b)), +only expansions that match defined equalities are succesfully +pulled.") + (:public + (lcp:define-struct lambda () + ((inner-edge-symbol "Symbol" :documentation "Currently expanded edge symbol.") + (inner-node-symbol "Symbol" :documentation "Currently expanded node symbol.") + (expression "Expression *" :documentation "Expression used in lambda during expansion." + :save-fun #'save-pointer :load-fun #'load-pointer)) + (:serialize :boost)) + #>cpp + /** + * Creates a variable-length expansion. Most params are forwarded + * to the @c ExpandCommon constructor, and are documented there. + * + * Expansion length bounds are both inclusive (as in Neo's Cypher + * implementation). + * + * @param type - Either Type::DEPTH_FIRST (default variable-length expansion), + * or Type::BREADTH_FIRST. + * @param is_reverse Set to `true` if the edges written on frame should expand + * from `node_symbol` to `input_symbol`. Opposed to the usual expanding + * from `input_symbol` to `node_symbol`. + * @param lower_bound An optional indicator of the minimum number of edges + * that get expanded (inclusive). + * @param upper_bound An optional indicator of the maximum number of edges + * that get expanded (inclusive). + * @param inner_edge_symbol Like `inner_node_symbol` + * @param inner_node_symbol For each expansion the node expanded into is + * assigned to this symbol so it can be evaulated by the 'where' + * expression. + * @param filter_ The filter that must be satisfied for an expansion to + * succeed. Can use inner(node|edge) symbols. If nullptr, it is ignored. + */ + ExpandVariable(Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Type type, + EdgeAtom::Direction direction, + const std::vector &edge_types, + bool is_reverse, Expression *lower_bound, + Expression *upper_bound, + const std::shared_ptr &input, + Symbol input_symbol, bool existing_node, Lambda filter_lambda, + std::experimental::optional weight_lambda, + std::experimental::optional total_weight, + GraphView graph_view = GraphView::AS_IS); + + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + // the Cursors are not declared in the header because + // it's edges_ and edges_it_ are decltyped using a helper function + // that should be inaccessible (private class function won't compile) + friend class ExpandVariableCursor; + friend class ExpandBreadthFirstCursor; + friend class ExpandWeightedShortestPathCursor; + + ExpandVariable() {} + cpp<#) + (:serialize :boost)) + +(lcp:define-class construct-named-path (logical-operator) + ((input "std::shared_ptr") + (path-symbol "Symbol" :reader t) + (path-elements "std::vector" :reader t)) + (:documentation + "Constructs a named path from it's elements and places it on the frame.") + (:public + #>cpp + ConstructNamedPath(const std::shared_ptr &input, + Symbol path_symbol, + const std::vector &path_elements) + : input_(input), + path_symbol_(path_symbol), + path_elements_(path_elements) {} + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp ConstructNamedPath() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class filter (logical-operator) + ((input "std::shared_ptr") + (expression "Expression *" + :save-fun #'save-pointer :load-fun #'load-pointer)) + (:documentation + "Filter whose Pull returns true only when the given expression +evaluates into true. + +The given expression is assumed to return either NULL (treated as false) or +a boolean value.") + (:public + #>cpp + Filter(const std::shared_ptr &input_, + Expression *expression_); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Filter() {} + + class FilterCursor : public Cursor { + public: + FilterCursor(const Filter &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Filter &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class produce (logical-operator) + ((input "std::shared_ptr") + (named-expressions "std::vector" :reader t + :save-fun #'save-pointers :load-fun #'load-pointers)) + (:documentation + "A logical operator that places an arbitrary number +if named expressions on the frame (the logical operator +for the RETURN clause). + +Supports optional input. When the input is provided, +it is Pulled from and the Produce succeeds once for +every input Pull (typically a MATCH/RETURN query). +When the input is not provided (typically a standalone +RETURN clause) the Produce's pull succeeds exactly once.") + (:public + #>cpp + Produce(const std::shared_ptr &input, + const std::vector &named_expressions); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + class ProduceCursor : public Cursor { + public: + ProduceCursor(const Produce &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Produce &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + }; + + Produce() {} + cpp<#) + (:serialize :boost)) + +(lcp:define-class delete (logical-operator) + ((input "std::shared_ptr") + (expressions "std::vector" + :save-fun #'save-pointers :load-fun #'load-pointers) + (detach :bool :documentation + "if the vertex should be detached before deletion if not detached, + and has connections, an error is raised ignored when deleting edges")) + (:documentation + "Operator for deleting vertices and edges. + +Has a flag for using DETACH DELETE when deleting vertices.") + (:public + #>cpp + Delete(const std::shared_ptr &input_, + const std::vector &expressions, bool detach_); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Delete() {} + + class DeleteCursor : public Cursor { + public: + DeleteCursor(const Delete &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Delete &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class set-property (logical-operator) + ((input "std::shared_ptr") + (lhs "PropertyLookup *" + :save-fun #'save-pointer :load-fun #'load-pointer) + (rhs "Expression *" + :save-fun #'save-pointer :load-fun #'load-pointer)) + (:documentation + "Logical Op for setting a single property on a single vertex or edge. + +The property value is an expression that must evaluate to some type that +can be stored (a TypedValue that can be converted to PropertyValue).") + (:public + #>cpp + SetProperty(const std::shared_ptr &input, + PropertyLookup *lhs, Expression *rhs); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + SetProperty() {} + + class SetPropertyCursor : public Cursor { + public: + SetPropertyCursor(const SetProperty &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const SetProperty &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class set-properties (logical-operator) + ((input "std::shared_ptr") + (input-symbol "Symbol") + (rhs "Expression *" + :save-fun #'save-pointer :load-fun #'load-pointer) + (op "Op")) + (:documentation + "Logical op for setting the whole properties set on a vertex or an +edge. + +The value being set is an expression that must evaluate to a vertex, edge +or +map (literal or parameter). + +Supports setting (replacing the whole properties set with another) and +updating.") + (:public + #>cpp + /** + * @brief Defines how setting the properties works. + * + * @c UPDATE means that the current property set is augmented with + * additional + * ones (existing props of the same name are replaced), while @c REPLACE + * means + * that the old props are discarded and replaced with new ones. + */ + enum class Op { UPDATE, REPLACE }; + + SetProperties(const std::shared_ptr &input, + Symbol input_symbol, Expression *rhs, Op op); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + SetProperties() {} + + class SetPropertiesCursor : public Cursor { + public: + SetPropertiesCursor(const SetProperties &self, + database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const SetProperties &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + + /** Helper function that sets the given values on either + * a VertexRecord or an EdgeRecord. + * @tparam TRecordAccessor Either RecordAccessor or + * RecordAccessor + */ + template + void Set(TRecordAccessor &record, const TypedValue &rhs) const; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class set-labels (logical-operator) + ((input "std::shared_ptr") + (input-symbol "Symbol") + (labels "std::vector")) + (:documentation + "Logical operator for setting an arbitrary number of labels on a Vertex. + +It does NOT remove labels that are already set on that Vertex.") + (:public + #>cpp + SetLabels(const std::shared_ptr &input, Symbol input_symbol, + const std::vector &labels); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + SetLabels() {} + + class SetLabelsCursor : public Cursor { + public: + SetLabelsCursor(const SetLabels &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const SetLabels &self_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class remove-property (logical-operator) + ((input "std::shared_ptr") + (lhs "PropertyLookup *" + :save-fun #'save-pointer :load-fun #'load-pointer)) + (:documentation + "Logical op for removing a property from an edge or a vertex.") + (:public + #>cpp + RemoveProperty(const std::shared_ptr &input, + PropertyLookup *lhs); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + RemoveProperty() {} + + class RemovePropertyCursor : public Cursor { + public: + RemovePropertyCursor(const RemoveProperty &self, + database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const RemoveProperty &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class remove-labels (logical-operator) + ((input "std::shared_ptr") + (input-symbol "Symbol") + (labels "std::vector")) + (:documentation + "Logical operator for removing an arbitrary number of labels on a Vertex. + +If a label does not exist on a Vertex, nothing happens.") + (:public + #>cpp + RemoveLabels(const std::shared_ptr &input, + Symbol input_symbol, const std::vector &labels); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + RemoveLabels() {} + + class RemoveLabelsCursor : public Cursor { + public: + RemoveLabelsCursor(const RemoveLabels &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const RemoveLabels &self_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class (expand-uniqueness-filter t-accessor) (logical-operator) + ((input "std::shared_ptr") + (expand-symbol "Symbol") + (previous-symbols "std::vector")) + (:documentation + "Filter whose Pull returns true only when the given +expand_symbol frame value (the latest expansion) is not +equal to any of the previous_symbols frame values. + +Used for implementing [iso|cypher]morphism. +Isomorphism is vertex-uniqueness. It means that +two different vertices in a pattern can not map to the +same data vertex. For example, if the database +contains one vertex with a recursive relationship, +then the query +MATCH ()-[]->() combined with vertex uniqueness +yields no results (no uniqueness yields one). +Cyphermorphism is edge-uniqueness (the above +explanation applies). By default Neo4j uses +Cyphermorphism (that's where the name stems from, +it is not a valid graph-theory term). + +Works for both Edge and Vertex uniqueness checks +(provide the accessor type as a template argument). +Supports variable-length-edges (uniqueness comparisons +between edges and an edge lists).") + (:public + #>cpp + ExpandUniquenessFilter(const std::shared_ptr &input, + Symbol expand_symbol, + const std::vector &previous_symbols); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + ExpandUniquenessFilter() {} + + class ExpandUniquenessFilterCursor : public Cursor { + public: + ExpandUniquenessFilterCursor(const ExpandUniquenessFilter &self, + database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const ExpandUniquenessFilter &self_; + const std::unique_ptr input_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class accumulate (logical-operator) + ((input "std::shared_ptr") + (symbols "std::vector" :reader t) + (advance-command :bool :reader t)) + (:documentation + "Pulls everything from the input before passing it through. +Optionally advances the command after accumulation and before emitting. + +On the first Pull from this Op's Cursor the input Cursor will be +Pulled until it is empty. The results will be accumulated in the +temporary cache. Once the input Cursor is empty, this Op's Cursor +will start returning cached stuff from it's Pull. + +This technique is used for ensuring all the operations from the +previous LogicalOp have been performed before exposing data +to the next. A typical use-case is the `MATCH - SET - RETURN` +query in which every SET iteration must be performed before +RETURN starts iterating (see Memgraph Wiki for detailed reasoning). + +IMPORTANT: This Op does not cache all the results but only those +elements from the frame whose symbols (frame positions) it was given. +All other frame positions will contain undefined junk after this +op has executed, and should not be used. + +This op can also advance the command after the accumulation and +before emitting. If the command gets advanced, every value that +has been cached will be reconstructed before Pull returns. + +@param input Input @c LogicalOperator. +@param symbols A vector of Symbols that need to be accumulated + and exposed to the next op.") + (:public + #>cpp + Accumulate(const std::shared_ptr &input, + const std::vector &symbols, bool advance_command = false); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Accumulate() {} + + class AccumulateCursor : public Cursor { + public: + AccumulateCursor(const Accumulate &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Accumulate &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + std::vector> cache_; + decltype(cache_.begin()) cache_it_ = cache_.begin(); + bool pulled_all_input_{false}; + }; + cpp<#) + (:serialize :boost)) + +#>cpp +/** + * Custom equality function for a vector of typed values. + * Used in unordered_maps in Aggregate and Distinct operators. + */ +struct TypedValueVectorEqual { + bool operator()(const std::vector &left, + const std::vector &right) const; +}; +cpp<# + +(lcp:define-class aggregate (logical-operator) + ((input "std::shared_ptr") + (aggregations "std::vector" :reader t) + (group-by "std::vector" :reader t + :save-fun #'save-pointers :load-fun #'load-pointers) + (remember "std::vector" :reader t)) + (:documentation + "Performs an arbitrary number of aggregations of data +from the given input grouped by the given criteria. + +Aggregations are defined by triples that define +(input data expression, type of aggregation, output symbol). +Input data is grouped based on the given set of named +expressions. Grouping is done on unique values. + +IMPORTANT: +Ops taking their input from an aggregation are only +allowed to use frame values that are either aggregation +outputs or group-by named-expressions. All other frame +elements are in an undefined state after aggregation.") + (:public + (lcp:define-struct element () + ((value "Expression *" + :save-fun #'save-pointer :load-fun #'load-pointer) + (key "Expression *" + :save-fun #'save-pointer :load-fun #'load-pointer) + (op "Aggregation::Op") + (output_sym "Symbol")) + (:documentation + "An aggregation element, contains: + (input data expression, key expression - only used in COLLECT_MAP, type of + aggregation, output symbol).") + (:serialize :boost)) + #>cpp + Aggregate(const std::shared_ptr &input, + const std::vector &aggregations, + const std::vector &group_by, + const std::vector &remember); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Aggregate() {} + + class AggregateCursor : public Cursor { + public: + AggregateCursor(const Aggregate &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + // Data structure for a single aggregation cache. + // does NOT include the group-by values since those + // are a key in the aggregation map. + // The vectors in an AggregationValue contain one element + // for each aggregation in this LogicalOp. + struct AggregationValue { + // how many input rows has been aggregated in respective + // values_ element so far + std::vector counts_; + // aggregated values. Initially Null (until at least one + // input row with a valid value gets processed) + std::vector values_; + // remember values. + std::vector remember_; + }; + + const Aggregate &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + // storage for aggregated data + // map key is the vector of group-by values + // map value is an AggregationValue struct + std::unordered_map< + std::vector, AggregationValue, + // use FNV collection hashing specialized for a vector of TypedValues + utils::FnvCollection, TypedValue, + TypedValue::Hash>, + // custom equality + TypedValueVectorEqual> + aggregation_; + // iterator over the accumulated cache + decltype(aggregation_.begin()) aggregation_it_ = aggregation_.begin(); + // this LogicalOp pulls all from the input on it's first pull + // this switch tracks if this has been performed + bool pulled_all_input_{false}; + + /** + * Pulls from the input operator until exhausted and aggregates the + * results. If the input operator is not provided, a single call + * to ProcessOne is issued. + * + * Accumulation automatically groups the results so that `aggregation_` + * cache cardinality depends on number of + * aggregation results, and not on the number of inputs. + */ + void ProcessAll(Frame &, Context &); + + /** + * Performs a single accumulation. + */ + void ProcessOne(Frame &frame, const SymbolTable &symbolTable, + ExpressionEvaluator &evaluator); + + /** Ensures the new AggregationValue has been initialized. This means + * that the value vectors are filled with an appropriate number of Nulls, + * counts are set to 0 and remember values are remembered. + */ + void EnsureInitialized(Frame &frame, AggregationValue &agg_value) const; + + /** Updates the given AggregationValue with new data. Assumes that + * the AggregationValue has been initialized */ + void Update(Frame &frame, const SymbolTable &symbol_table, + ExpressionEvaluator &evaluator, AggregationValue &agg_value); + + /** Checks if the given TypedValue is legal in MIN and MAX. If not + * an appropriate exception is thrown. */ + void EnsureOkForMinMax(const TypedValue &value) const; + + /** Checks if the given TypedValue is legal in AVG and SUM. If not + * an appropriate exception is thrown. */ + void EnsureOkForAvgSum(const TypedValue &value) const; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class skip (logical-operator) + ((input "std::shared_ptr") + (expression "Expression *" + :save-fun #'save-pointer :load-fun #'load-pointer)) + (:documentation + "Skips a number of Pulls from the input op. + +The given expression determines how many Pulls from the input +should be skipped (ignored). +All other successful Pulls from the +input are simply passed through. + +The given expression is evaluated after the first Pull from +the input, and only once. Neo does not allow this expression +to contain identifiers, and neither does Memgraph, but this +operator's implementation does not expect this.") + (:public + #>cpp + Skip(const std::shared_ptr &input, Expression *expression); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Skip() {} + + class SkipCursor : public Cursor { + public: + SkipCursor(const Skip &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Skip &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + // init to_skip_ to -1, indicating + // that it's still unknown (input has not been Pulled yet) + int to_skip_{-1}; + int skipped_{0}; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class limit (logical-operator) + ((input "std::shared_ptr") + (expression "Expression *" + :save-fun #'save-pointer :load-fun #'load-pointer)) + (:documentation + "Limits the number of Pulls from the input op. + +The given expression determines how many +input Pulls should be passed through. The input is not +Pulled once this limit is reached. Note that this has +implications: the out-of-bounds input Pulls are never +evaluated. + +The limit expression must NOT use anything from the +Frame. It is evaluated before the first Pull from the +input. This is consistent with Neo (they don't allow +identifiers in limit expressions), and it's necessary +when limit evaluates to 0 (because 0 Pulls from the +input should be performed).") + (:public + #>cpp + Limit(const std::shared_ptr &input, Expression *expression); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Limit() {} + + class LimitCursor : public Cursor { + public: + LimitCursor(const Limit &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Limit &self_; + database::GraphDbAccessor &db_; + std::unique_ptr input_cursor_; + // init limit_ to -1, indicating + // that it's still unknown (Cursor has not been Pulled yet) + int limit_{-1}; + int pulled_{0}; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class order-by (logical-operator) + ((input "std::shared_ptr") + (compare "TypedValueVectorCompare" :reader t) + (order-by "std::vector" :reader t + :save-fun #'save-pointers :load-fun #'load-pointers) + (output-symbols "std::vector" :reader t)) + (:documentation + "Logical operator for ordering (sorting) results. + +Sorts the input rows based on an arbitrary number of +Expressions. Ascending or descending ordering can be chosen +for each independently (not providing enough orderings +results in a runtime error). + +For each row an arbitrary number of Frame elements can be +remembered. Only these elements (defined by their Symbols) +are valid for usage after the OrderBy operator.") + (:public + #>cpp + OrderBy(const std::shared_ptr &input, + const std::vector> &order_by, + const std::vector &output_symbols); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + OrderBy() {} + + class OrderByCursor : public Cursor { + public: + OrderByCursor(const OrderBy &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const OrderBy &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + bool did_pull_all_{false}; + // a cache of elements pulled from the input + // first pair element is the order-by vector + // second pair is the remember vector + // the cache is filled and sorted (only on first pair elem) on first Pull + std::vector, std::vector>> + cache_; + // iterator over the cache_, maintains state between Pulls + decltype(cache_.begin()) cache_it_ = cache_.begin(); + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class merge (logical-operator) + ((input "std::shared_ptr") + (merge-match "std::shared_ptr" :reader t) + (merge-create "std::shared_ptr" :reader t)) + (:documentation + "Merge operator. For every sucessful Pull from the +input operator a Pull from the merge_match is attempted. All +successfull Pulls from the merge_match are passed on as output. +If merge_match Pull does not yield any elements, a single Pull +from the merge_create op is performed. + +The input logical op is optional. If false (nullptr) +it will be replaced by a Once op. + +For an argumentation of this implementation see the wiki +documentation.") + (:public + #>cpp + Merge(const std::shared_ptr &input, + const std::shared_ptr &merge_match, + const std::shared_ptr &merge_create); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + // TODO: Consider whether we want to treat Merge as having single input. It + // makes sense that we do, because other branches are executed depending on + // the input. + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Merge() {} + + class MergeCursor : public Cursor { + public: + MergeCursor(const Merge &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const std::unique_ptr input_cursor_; + const std::unique_ptr merge_match_cursor_; + const std::unique_ptr merge_create_cursor_; + + // indicates if the next Pull from this cursor + // should perform a pull from input_cursor_ + // this is true when: + // - first Pulling from this cursor + // - previous Pull from this cursor exhausted the merge_match_cursor + bool pull_input_{true}; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class optional (logical-operator) + ((input "std::shared_ptr") + (optional "std::shared_ptr" :reader t) + (optional-symbols "std::vector" :reader t)) + (:documentation + "Optional operator. Used for optional match. For every +successful Pull from the input branch a Pull from the optional +branch is attempted (and Pulled from till exhausted). If zero +Pulls succeed from the optional branch, the Optional operator +sets the optional symbols to TypedValue::Null on the Frame +and returns true, once.") + (:public + #>cpp + Optional(const std::shared_ptr &input, + const std::shared_ptr &optional, + const std::vector &optional_symbols); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + // TODO: Consider whether we want to treat Optional as having single input. + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Optional() {} + + class OptionalCursor : public Cursor { + public: + OptionalCursor(const Optional &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Optional &self_; + const std::unique_ptr input_cursor_; + const std::unique_ptr optional_cursor_; + // indicates if the next Pull from this cursor should + // perform a Pull from the input_cursor_ + // this is true when: + // - first pulling from this Cursor + // - previous Pull from this cursor exhausted the optional_cursor_ + bool pull_input_{true}; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class unwind (logical-operator) + ((input "std::shared_ptr") + (input-expression "Expression *" :reader t + :save-fun #'save-pointer :load-fun #'load-pointer) + (output-symbol "Symbol")) + (:documentation + "Takes a list TypedValue as it's input and yields each +element as it's output. + +Input is optional (unwind can be the first clause in a query).") + (:public + #>cpp + Unwind(const std::shared_ptr &input, + Expression *input_expression_, Symbol output_symbol); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Unwind() {} + + class UnwindCursor : public Cursor { + public: + UnwindCursor(const Unwind &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Unwind &self_; + database::GraphDbAccessor &db_; + const std::unique_ptr input_cursor_; + // typed values we are unwinding and yielding + std::vector input_value_; + // current position in input_value_ + std::vector::iterator input_value_it_ = input_value_.end(); + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class distinct (logical-operator) + ((input "std::shared_ptr") + (value-symbols "std::vector")) + (:documentation + "Ensures that only distinct rows are yielded. +This implementation accepts a vector of Symbols +which define a row. Only those Symbols are valid +for use in operators following Distinct. + +This implementation maintains input ordering.") + (:public + #>cpp + Distinct(const std::shared_ptr &input, + const std::vector &value_symbols); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private + #>cpp + Distinct() {} + + class DistinctCursor : public Cursor { + public: + DistinctCursor(const Distinct &self, database::GraphDbAccessor &db); + + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Distinct &self_; + const std::unique_ptr input_cursor_; + // a set of already seen rows + std::unordered_set< + std::vector, + // use FNV collection hashing specialized for a vector of TypedValues + utils::FnvCollection, TypedValue, + TypedValue::Hash>, + TypedValueVectorEqual> + seen_rows_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class create-index (logical-operator) + ((label "storage::Label" :reader t) + (property "storage::Property" :reader t)) + (:documentation + "Creates an index for a combination of label and a property. + +This operator takes no input and it shouldn't serve as an input to any +operator. Pulling from the cursor of this operator will create an index in +the database for the vertices which have the given label and property. In +case the index already exists, nothing happens.") + (:public + #>cpp + CreateIndex(storage::Label label, storage::Property property); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override { + return {}; + } + + bool HasSingleInput() const override; + std::shared_ptr input() const override; + void set_input(std::shared_ptr) override; + cpp<#) + (:private #>cpp CreateIndex() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class union (logical-operator) + ((left-op "std::shared_ptr") + (right-op "std::shared_ptr") + (union-symbols "std::vector") + (left-symbols "std::vector") + (right-symbols "std::vector")) + (:documentation + "A logical operator that applies UNION operator on inputs and places the +result on the frame. + +This operator takes two inputs, a vector of symbols for the result, and +vectors of symbols used by each of the inputs.") + (:public + #>cpp + Union(const std::shared_ptr &left_op, + const std::shared_ptr &right_op, + const std::vector &union_symbols, + const std::vector &left_symbols, + const std::vector &right_symbols); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override; + std::shared_ptr input() const override; + void set_input(std::shared_ptr) override; + cpp<#) + (:private + #>cpp + Union() {} + + class UnionCursor : public Cursor { + public: + UnionCursor(const Union &self, database::GraphDbAccessor &db); + bool Pull(Frame &, Context &) override; + void Reset() override; + + private: + const Union &self_; + const std::unique_ptr left_cursor_, right_cursor_; + }; + cpp<#) + (:serialize :boost)) + +(lcp:define-class pull-remote (logical-operator) + ((input "std::shared_ptr") + (plan-id :int64_t :initval 0 :reader t) + (symbols "std::vector" :reader t)) + (:documentation + "An operator in distributed Memgraph that yields both local and remote (from +other workers) frames. Obtaining remote frames is done through RPC calls to +`distributed::ProduceRpcServer`s running on all the workers. + +This operator aims to yield results as fast as possible and lose minimal +time on data transfer. It gives no guarantees on result order.") + (:public + #>cpp + PullRemote(const std::shared_ptr &input, int64_t plan_id, + const std::vector &symbols) + : input_(input), plan_id_(plan_id), symbols_(symbols) {} + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private #>cpp PullRemote() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class synchronize (logical-operator) + ((input "std::shared_ptr") + (pull-remote "std::shared_ptr" :reader t) + (advance-command :bool :initval "false" :reader t)) + (:documentation + "Operator used to synchronize stages of plan execution between the master and +all the workers. Synchronization is necessary in queries that update that +graph state because updates (as well as creations and deletions) are deferred +to avoid multithreaded modification of graph element data (as it's not +thread-safe). + +Logic of the synchronize operator is: + +1. If there is a Pull, tell all the workers to pull on that plan and + accumulate results without sending them to the master. This is async. +2. Accumulate local results, in parallel with 1. getting executed on workers. +3. Wait till the master and all the workers are done accumulating. +4. Advance the command, if necessary. +5. Tell all the workers to apply their updates. This is async. +6. Apply local updates, in parallel with 5. on the workers. +7. Notify workers that the command has advanced, if necessary. +8. Yield all the results, first local, then from Pull if available.") + (:public + #>cpp + Synchronize(const std::shared_ptr &input, + const std::shared_ptr &pull_remote, + bool advance_command) + : input_(input), + pull_remote_(pull_remote), + advance_command_(advance_command) {} + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + std::vector OutputSymbols( + const SymbolTable &symbol_table) const override { + return input_->OutputSymbols(symbol_table); + } + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private #>cpp Synchronize() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class cartesian (logical-operator) + ((left-op "std::shared_ptr" :reader t) + (left-symbols "std::vector" :reader t) + (right-op "std::shared_ptr" :reader t) + (right-symbols "std::vector" :reader t)) + (:documentation + "Operator for producing a Cartesian product from 2 input branches") + (:public + #>cpp + /** Construct the operator with left input branch and right input branch. */ + Cartesian(const std::shared_ptr &left_op, + const std::vector &left_symbols, + const std::shared_ptr &right_op, + const std::vector &right_symbols) + : left_op_(left_op), + left_symbols_(left_symbols), + right_op_(right_op), + right_symbols_(right_symbols) {} + + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override; + std::shared_ptr input() const override; + void set_input(std::shared_ptr) override; + cpp<#) + (:private #>cpp Cartesian() {} cpp<#) + (:serialize :boost)) + +(lcp:define-class pull-remote-order-by (logical-operator) + ((input "std::shared_ptr") + (plan-id :int64_t :initval 0 :reader t) + (symbols "std::vector" :reader t) + (order-by "std::vector" :reader t + :save-fun #'save-pointers :load-fun #'load-pointers) + (compare "TypedValueVectorCompare" :reader t)) + (:documentation + "Operator that merges distributed OrderBy operators. +Instead of using a regular OrderBy on master (which would collect all remote +results and order them), we can have each worker do an OrderBy locally and +have the master rely on the fact that the results are ordered and merge them +by having only one result from each worker.") + (:public + #>cpp + PullRemoteOrderBy( + const std::shared_ptr &input, int64_t plan_id, + const std::vector> &order_by, + const std::vector &symbols); + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + + std::vector ModifiedSymbols(const SymbolTable &) const override; + std::vector OutputSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private #>cpp PullRemoteOrderBy() {} cpp<#) + (:serialize :boost)) + +#>cpp + +} // namespace plan +} // namespace query + +BOOST_CLASS_EXPORT_KEY(query::plan::Once); +BOOST_CLASS_EXPORT_KEY(query::plan::CreateNode); +BOOST_CLASS_EXPORT_KEY(query::plan::CreateExpand); +BOOST_CLASS_EXPORT_KEY(query::plan::ScanAll); +BOOST_CLASS_EXPORT_KEY(query::plan::ScanAllByLabel); +BOOST_CLASS_EXPORT_KEY(query::plan::ScanAllByLabelPropertyRange); +BOOST_CLASS_EXPORT_KEY(query::plan::ScanAllByLabelPropertyValue); +BOOST_CLASS_EXPORT_KEY(query::plan::Expand); +BOOST_CLASS_EXPORT_KEY(query::plan::ExpandVariable); +BOOST_CLASS_EXPORT_KEY(query::plan::Filter); +BOOST_CLASS_EXPORT_KEY(query::plan::Produce); +BOOST_CLASS_EXPORT_KEY(query::plan::ConstructNamedPath); +BOOST_CLASS_EXPORT_KEY(query::plan::Delete); +BOOST_CLASS_EXPORT_KEY(query::plan::SetProperty); +BOOST_CLASS_EXPORT_KEY(query::plan::SetProperties); +BOOST_CLASS_EXPORT_KEY(query::plan::SetLabels); +BOOST_CLASS_EXPORT_KEY(query::plan::RemoveProperty); +BOOST_CLASS_EXPORT_KEY(query::plan::RemoveLabels); +BOOST_CLASS_EXPORT_KEY(query::plan::ExpandUniquenessFilter); +BOOST_CLASS_EXPORT_KEY(query::plan::ExpandUniquenessFilter); +BOOST_CLASS_EXPORT_KEY(query::plan::Accumulate); +BOOST_CLASS_EXPORT_KEY(query::plan::Aggregate); +BOOST_CLASS_EXPORT_KEY(query::plan::Skip); +BOOST_CLASS_EXPORT_KEY(query::plan::Limit); +BOOST_CLASS_EXPORT_KEY(query::plan::OrderBy); +BOOST_CLASS_EXPORT_KEY(query::plan::Merge); +BOOST_CLASS_EXPORT_KEY(query::plan::Optional); +BOOST_CLASS_EXPORT_KEY(query::plan::Unwind); +BOOST_CLASS_EXPORT_KEY(query::plan::Distinct); +BOOST_CLASS_EXPORT_KEY(query::plan::CreateIndex); +BOOST_CLASS_EXPORT_KEY(query::plan::Union); +BOOST_CLASS_EXPORT_KEY(query::plan::PullRemote); +BOOST_CLASS_EXPORT_KEY(query::plan::Synchronize); +BOOST_CLASS_EXPORT_KEY(query::plan::Cartesian); +BOOST_CLASS_EXPORT_KEY(query::plan::PullRemoteOrderBy); +cpp<#