2017-04-11 21:11:48 +08:00
|
|
|
#include <algorithm>
|
|
|
|
|
2017-04-13 16:01:16 +08:00
|
|
|
#include "query/plan/operator.hpp"
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
#include "query/exceptions.hpp"
|
|
|
|
#include "query/frontend/ast/ast.hpp"
|
2017-04-13 16:01:16 +08:00
|
|
|
#include "query/interpret/eval.hpp"
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
// macro for the default implementation of LogicalOperator::Accept
|
|
|
|
// that accepts the visitor and visits it's input_ operator
|
|
|
|
#define ACCEPT_WITH_INPUT(class_name) \
|
|
|
|
void class_name::Accept(LogicalOperatorVisitor &visitor) { \
|
|
|
|
if (visitor.PreVisit(*this)) { \
|
|
|
|
visitor.Visit(*this); \
|
|
|
|
input_->Accept(visitor); \
|
|
|
|
visitor.PostVisit(*this); \
|
|
|
|
} \
|
|
|
|
}
|
|
|
|
|
2017-04-26 22:12:39 +08:00
|
|
|
namespace query::plan {
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-21 16:57:53 +08:00
|
|
|
void Once::Accept(LogicalOperatorVisitor &visitor) {
|
|
|
|
if (visitor.PreVisit(*this)) {
|
|
|
|
visitor.Visit(*this);
|
|
|
|
visitor.PostVisit(*this);
|
|
|
|
}
|
|
|
|
}
|
2017-04-24 20:37:18 +08:00
|
|
|
std::unique_ptr<Cursor> Once::MakeCursor(GraphDbAccessor &) {
|
2017-04-21 16:57:53 +08:00
|
|
|
return std::make_unique<OnceCursor>();
|
|
|
|
}
|
|
|
|
|
2017-04-24 20:37:18 +08:00
|
|
|
bool Once::OnceCursor::Pull(Frame &, const SymbolTable &) {
|
2017-04-21 16:57:53 +08:00
|
|
|
if (!did_pull_) {
|
|
|
|
did_pull_ = true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Once::OnceCursor::Reset() { did_pull_ = false; }
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
CreateNode::CreateNode(const NodeAtom *node_atom,
|
|
|
|
const std::shared_ptr<LogicalOperator> &input)
|
2017-04-21 16:57:53 +08:00
|
|
|
: node_atom_(node_atom), input_(input ? input : std::make_shared<Once>()) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(CreateNode)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> CreateNode::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<CreateNodeCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self,
|
2017-03-30 17:15:57 +08:00
|
|
|
GraphDbAccessor &db)
|
2017-04-21 16:57:53 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
bool CreateNode::CreateNodeCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-04-21 16:57:53 +08:00
|
|
|
if (input_cursor_->Pull(frame, symbol_table)) {
|
2017-03-30 17:15:57 +08:00
|
|
|
Create(frame, symbol_table);
|
|
|
|
return true;
|
|
|
|
} else
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-03-30 17:15:57 +08:00
|
|
|
void CreateNode::CreateNodeCursor::Create(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
auto new_node = db_.insert_vertex();
|
|
|
|
for (auto label : self_.node_atom_->labels_) new_node.add_label(label);
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
// Evaluator should use the latest accessors, as modified in this query, when
|
|
|
|
// setting properties on new nodes.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-03-30 17:15:57 +08:00
|
|
|
for (auto &kv : self_.node_atom_->properties_) {
|
|
|
|
kv.second->Accept(evaluator);
|
|
|
|
new_node.PropsSet(kv.first, evaluator.PopBack());
|
|
|
|
}
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.node_atom_->identifier_)] = new_node;
|
2017-03-30 17:15:57 +08:00
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
CreateExpand::CreateExpand(const NodeAtom *node_atom, const EdgeAtom *edge_atom,
|
2017-03-30 17:15:57 +08:00
|
|
|
const std::shared_ptr<LogicalOperator> &input,
|
2017-04-24 19:56:50 +08:00
|
|
|
Symbol input_symbol, bool existing_node)
|
2017-03-30 17:15:57 +08:00
|
|
|
: node_atom_(node_atom),
|
|
|
|
edge_atom_(edge_atom),
|
2017-04-26 19:49:41 +08:00
|
|
|
input_(input ? input : std::make_shared<Once>()),
|
2017-03-30 17:15:57 +08:00
|
|
|
input_symbol_(input_symbol),
|
2017-04-24 19:56:50 +08:00
|
|
|
existing_node_(existing_node) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(CreateExpand)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> CreateExpand::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<CreateExpandCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
CreateExpand::CreateExpandCursor::CreateExpandCursor(const CreateExpand &self,
|
2017-03-30 17:15:57 +08:00
|
|
|
GraphDbAccessor &db)
|
|
|
|
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
|
|
|
|
|
|
|
|
bool CreateExpand::CreateExpandCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
|
|
|
// get the origin vertex
|
2017-04-03 20:32:29 +08:00
|
|
|
TypedValue &vertex_value = frame[self_.input_symbol_];
|
|
|
|
auto &v1 = vertex_value.Value<VertexAccessor>();
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
// Similarly to CreateNode, newly created edges and nodes should use the
|
|
|
|
// latest accesors.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-04-03 20:32:29 +08:00
|
|
|
// E.g. we pickup new properties: `CREATE (n {p: 42}) -[:r {ep: n.p}]-> ()`
|
|
|
|
v1.SwitchNew();
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
// get the destination vertex (possibly an existing node)
|
2017-04-03 20:32:29 +08:00
|
|
|
auto &v2 = OtherVertex(frame, symbol_table, evaluator);
|
|
|
|
v2.SwitchNew();
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
// create an edge between the two nodes
|
|
|
|
switch (self_.edge_atom_->direction_) {
|
|
|
|
case EdgeAtom::Direction::LEFT:
|
|
|
|
CreateEdge(v2, v1, frame, symbol_table, evaluator);
|
|
|
|
break;
|
|
|
|
case EdgeAtom::Direction::RIGHT:
|
|
|
|
CreateEdge(v1, v2, frame, symbol_table, evaluator);
|
|
|
|
break;
|
|
|
|
case EdgeAtom::Direction::BOTH:
|
2017-04-25 21:22:21 +08:00
|
|
|
// in the case of an undirected CreateExpand we choose an arbitrary
|
|
|
|
// direction. this is used in the MERGE clause
|
|
|
|
// it is not allowed in the CREATE clause, and the semantic
|
|
|
|
// checker needs to ensure it doesn't reach this point
|
|
|
|
CreateEdge(v1, v2, frame, symbol_table, evaluator);
|
2017-03-30 17:15:57 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void CreateExpand::CreateExpandCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(
|
2017-04-11 18:29:20 +08:00
|
|
|
Frame &frame, const SymbolTable &symbol_table,
|
|
|
|
ExpressionEvaluator &evaluator) {
|
2017-04-24 19:56:50 +08:00
|
|
|
if (self_.existing_node_) {
|
2017-03-30 17:15:57 +08:00
|
|
|
TypedValue &dest_node_value =
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.node_atom_->identifier_)];
|
2017-03-30 17:15:57 +08:00
|
|
|
return dest_node_value.Value<VertexAccessor>();
|
|
|
|
} else {
|
|
|
|
// the node does not exist, it needs to be created
|
|
|
|
auto node = db_.insert_vertex();
|
|
|
|
for (auto label : self_.node_atom_->labels_) node.add_label(label);
|
|
|
|
for (auto kv : self_.node_atom_->properties_) {
|
|
|
|
kv.second->Accept(evaluator);
|
|
|
|
node.PropsSet(kv.first, evaluator.PopBack());
|
|
|
|
}
|
2017-04-11 18:29:20 +08:00
|
|
|
auto symbol = symbol_table.at(*self_.node_atom_->identifier_);
|
2017-04-03 20:32:29 +08:00
|
|
|
frame[symbol] = node;
|
|
|
|
return frame[symbol].Value<VertexAccessor>();
|
2017-03-30 17:15:57 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void CreateExpand::CreateExpandCursor::CreateEdge(
|
|
|
|
VertexAccessor &from, VertexAccessor &to, Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table, ExpressionEvaluator &evaluator) {
|
2017-03-30 17:15:57 +08:00
|
|
|
EdgeAccessor edge =
|
|
|
|
db_.insert_edge(from, to, self_.edge_atom_->edge_types_[0]);
|
|
|
|
for (auto kv : self_.edge_atom_->properties_) {
|
|
|
|
kv.second->Accept(evaluator);
|
|
|
|
edge.PropsSet(kv.first, evaluator.PopBack());
|
|
|
|
}
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.edge_atom_->identifier_)] = edge;
|
2017-03-30 17:15:57 +08:00
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
ScanAll::ScanAll(const NodeAtom *node_atom,
|
2017-04-25 21:38:46 +08:00
|
|
|
const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
GraphView graph_view)
|
|
|
|
: node_atom_(node_atom),
|
|
|
|
input_(input ? input : std::make_shared<Once>()),
|
|
|
|
graph_view_(graph_view) {
|
|
|
|
permanent_assert(graph_view != GraphView::AS_IS,
|
|
|
|
"ScanAll must have explicitly defined GraphView")
|
|
|
|
}
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(ScanAll)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> ScanAll::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<ScanAllCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
ScanAll::ScanAllCursor::ScanAllCursor(const ScanAll &self, GraphDbAccessor &db)
|
2017-04-04 21:21:44 +08:00
|
|
|
: self_(self),
|
2017-04-21 16:57:53 +08:00
|
|
|
input_cursor_(self.input_->MakeCursor(db)),
|
2017-04-25 21:22:21 +08:00
|
|
|
// TODO change to db.vertices(self.graph_view_ == GraphView::NEW)
|
2017-04-25 21:38:46 +08:00
|
|
|
// once this GraphDbAccessor API is available
|
2017-04-04 21:21:44 +08:00
|
|
|
vertices_(db.vertices()),
|
2017-04-25 21:22:21 +08:00
|
|
|
vertices_it_(vertices_.end()) {
|
2017-04-26 22:12:39 +08:00
|
|
|
if (self.graph_view_ == GraphView::NEW) throw utils::NotYetImplemented();
|
|
|
|
}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool ScanAll::ScanAllCursor::Pull(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
2017-04-21 16:57:53 +08:00
|
|
|
if (vertices_it_ == vertices_.end()) {
|
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
vertices_it_ = vertices_.begin();
|
2017-04-04 21:21:44 +08:00
|
|
|
}
|
|
|
|
|
2017-04-21 16:57:53 +08:00
|
|
|
// if vertices_ is empty then we are done even though we have just
|
|
|
|
// reinitialized vertices_it_
|
2017-04-05 21:00:26 +08:00
|
|
|
if (vertices_it_ == vertices_.end()) return false;
|
2017-04-04 21:21:44 +08:00
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.node_atom_->identifier_)] = *vertices_it_++;
|
2017-03-30 17:15:57 +08:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void ScanAll::ScanAllCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
vertices_it_ = vertices_.end();
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Expand::Expand(const NodeAtom *node_atom, const EdgeAtom *edge_atom,
|
2017-03-30 17:15:57 +08:00
|
|
|
const std::shared_ptr<LogicalOperator> &input,
|
2017-04-25 21:38:46 +08:00
|
|
|
Symbol input_symbol, bool existing_node, bool existing_edge,
|
|
|
|
GraphView graph_view)
|
2017-03-30 17:15:57 +08:00
|
|
|
: node_atom_(node_atom),
|
|
|
|
edge_atom_(edge_atom),
|
2017-04-26 19:49:41 +08:00
|
|
|
input_(input ? input : std::make_shared<Once>()),
|
2017-03-30 17:15:57 +08:00
|
|
|
input_symbol_(input_symbol),
|
2017-04-24 19:56:50 +08:00
|
|
|
existing_node_(existing_node),
|
2017-04-25 21:38:46 +08:00
|
|
|
existing_edge_(existing_edge),
|
|
|
|
graph_view_(graph_view) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Expand)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Expand::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<ExpandCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Expand::ExpandCursor::ExpandCursor(const Expand &self, GraphDbAccessor &db)
|
2017-03-30 17:15:57 +08:00
|
|
|
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool Expand::ExpandCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
while (true) {
|
|
|
|
// attempt to get a value from the incoming edges
|
|
|
|
if (in_edges_ && *in_edges_it_ != in_edges_->end()) {
|
|
|
|
EdgeAccessor edge = *(*in_edges_it_)++;
|
2017-04-24 19:56:50 +08:00
|
|
|
if (HandleExistingEdge(edge, frame, symbol_table) &&
|
2017-03-30 17:15:57 +08:00
|
|
|
PullNode(edge, EdgeAtom::Direction::LEFT, frame, symbol_table))
|
|
|
|
return true;
|
|
|
|
else
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
// attempt to get a value from the outgoing edges
|
|
|
|
if (out_edges_ && *out_edges_it_ != out_edges_->end()) {
|
|
|
|
EdgeAccessor edge = *(*out_edges_it_)++;
|
2017-04-11 21:44:44 +08:00
|
|
|
// when expanding in EdgeAtom::Direction::BOTH directions
|
|
|
|
// we should do only one expansion for cycles, and it was
|
|
|
|
// already done in the block above
|
|
|
|
if (self_.edge_atom_->direction_ == EdgeAtom::Direction::BOTH &&
|
|
|
|
edge.is_cycle())
|
|
|
|
continue;
|
2017-04-24 19:56:50 +08:00
|
|
|
if (HandleExistingEdge(edge, frame, symbol_table) &&
|
2017-03-30 17:15:57 +08:00
|
|
|
PullNode(edge, EdgeAtom::Direction::RIGHT, frame, symbol_table))
|
|
|
|
return true;
|
|
|
|
else
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
// if we are here, either the edges have not been initialized,
|
|
|
|
// or they have been exhausted. attempt to initialize the edges,
|
|
|
|
// if the input is exhausted
|
|
|
|
if (!InitEdges(frame, symbol_table)) return false;
|
|
|
|
|
|
|
|
// we have re-initialized the edges, continue with the loop
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Expand::ExpandCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
in_edges_.release();
|
|
|
|
in_edges_it_.release();
|
|
|
|
out_edges_.release();
|
|
|
|
out_edges_it_.release();
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool Expand::ExpandCursor::InitEdges(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
TypedValue &vertex_value = frame[self_.input_symbol_];
|
|
|
|
auto &vertex = vertex_value.Value<VertexAccessor>();
|
2017-04-25 21:38:46 +08:00
|
|
|
// switch the expansion origin vertex to the desired state
|
|
|
|
switch (self_.graph_view_) {
|
|
|
|
case GraphView::NEW:
|
|
|
|
vertex.SwitchNew();
|
|
|
|
break;
|
|
|
|
case GraphView::OLD:
|
|
|
|
vertex.SwitchOld();
|
|
|
|
break;
|
|
|
|
case GraphView::AS_IS:
|
|
|
|
break;
|
|
|
|
}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
auto direction = self_.edge_atom_->direction_;
|
|
|
|
if (direction == EdgeAtom::Direction::LEFT ||
|
|
|
|
direction == EdgeAtom::Direction::BOTH) {
|
|
|
|
in_edges_ = std::make_unique<InEdgeT>(vertex.in());
|
|
|
|
in_edges_it_ = std::make_unique<InEdgeIteratorT>(in_edges_->begin());
|
|
|
|
}
|
|
|
|
|
|
|
|
if (direction == EdgeAtom::Direction::RIGHT ||
|
|
|
|
direction == EdgeAtom::Direction::BOTH) {
|
|
|
|
out_edges_ = std::make_unique<InEdgeT>(vertex.out());
|
|
|
|
out_edges_it_ = std::make_unique<InEdgeIteratorT>(out_edges_->begin());
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO add support for Front and Back expansion (when QueryPlanner
|
|
|
|
// will need it). For now only Back expansion (left to right) is
|
|
|
|
// supported
|
|
|
|
// TODO add support for named paths
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 19:56:50 +08:00
|
|
|
bool Expand::ExpandCursor::HandleExistingEdge(const EdgeAccessor &new_edge,
|
|
|
|
Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
|
|
|
if (self_.existing_edge_) {
|
2017-03-30 17:15:57 +08:00
|
|
|
TypedValue &old_edge_value =
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.edge_atom_->identifier_)];
|
2017-03-30 17:15:57 +08:00
|
|
|
return old_edge_value.Value<EdgeAccessor>() == new_edge;
|
|
|
|
} else {
|
2017-04-24 19:56:50 +08:00
|
|
|
// not matching existing, so put the new_edge into the frame and return true
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.edge_atom_->identifier_)] = new_edge;
|
2017-03-30 17:15:57 +08:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool Expand::ExpandCursor::PullNode(const EdgeAccessor &new_edge,
|
2017-03-30 17:15:57 +08:00
|
|
|
EdgeAtom::Direction direction, Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
switch (direction) {
|
|
|
|
case EdgeAtom::Direction::LEFT:
|
2017-04-24 19:56:50 +08:00
|
|
|
return HandleExistingNode(new_edge.from(), frame, symbol_table);
|
2017-03-30 17:15:57 +08:00
|
|
|
case EdgeAtom::Direction::RIGHT:
|
2017-04-24 19:56:50 +08:00
|
|
|
return HandleExistingNode(new_edge.to(), frame, symbol_table);
|
2017-03-30 17:15:57 +08:00
|
|
|
case EdgeAtom::Direction::BOTH:
|
|
|
|
permanent_fail("Must indicate exact expansion direction here");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-24 19:56:50 +08:00
|
|
|
bool Expand::ExpandCursor::HandleExistingNode(const VertexAccessor new_node,
|
|
|
|
Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
|
|
|
if (self_.existing_node_) {
|
2017-03-30 17:15:57 +08:00
|
|
|
TypedValue &old_node_value =
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.node_atom_->identifier_)];
|
2017-03-30 17:15:57 +08:00
|
|
|
return old_node_value.Value<VertexAccessor>() == new_node;
|
|
|
|
} else {
|
2017-04-24 19:56:50 +08:00
|
|
|
// not matching existing, so put the new_edge into the frame and return true
|
2017-04-11 18:29:20 +08:00
|
|
|
frame[symbol_table.at(*self_.node_atom_->identifier_)] = new_node;
|
2017-03-30 17:15:57 +08:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
NodeFilter::NodeFilter(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
Symbol input_symbol, const NodeAtom *node_atom)
|
2017-03-30 17:15:57 +08:00
|
|
|
: input_(input), input_symbol_(input_symbol), node_atom_(node_atom) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(NodeFilter)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> NodeFilter::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<NodeFilterCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
NodeFilter::NodeFilterCursor::NodeFilterCursor(const NodeFilter &self,
|
2017-03-30 17:15:57 +08:00
|
|
|
GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
bool NodeFilter::NodeFilterCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
while (input_cursor_->Pull(frame, symbol_table)) {
|
2017-04-03 20:32:29 +08:00
|
|
|
auto &vertex = frame[self_.input_symbol_].Value<VertexAccessor>();
|
|
|
|
// Filter needs to use the old, unmodified vertex, even though we may change
|
|
|
|
// properties or labels during the current command.
|
|
|
|
vertex.SwitchOld();
|
2017-03-30 17:15:57 +08:00
|
|
|
if (VertexPasses(vertex, frame, symbol_table)) return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void NodeFilter::NodeFilterCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool NodeFilter::NodeFilterCursor::VertexPasses(
|
|
|
|
const VertexAccessor &vertex, Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
for (auto label : self_.node_atom_->labels_)
|
|
|
|
if (!vertex.has_label(label)) return false;
|
|
|
|
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator expression_evaluator(frame, symbol_table, db_,
|
|
|
|
GraphView::OLD);
|
2017-04-03 20:32:29 +08:00
|
|
|
// We don't want newly set properties to affect filtering.
|
2017-03-30 17:15:57 +08:00
|
|
|
for (auto prop_pair : self_.node_atom_->properties_) {
|
|
|
|
prop_pair.second->Accept(expression_evaluator);
|
|
|
|
TypedValue comparison_result =
|
|
|
|
vertex.PropsAt(prop_pair.first) == expression_evaluator.PopBack();
|
2017-04-20 19:31:18 +08:00
|
|
|
if (comparison_result.IsNull() || !comparison_result.Value<bool>())
|
2017-03-30 17:15:57 +08:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
EdgeFilter::EdgeFilter(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
Symbol input_symbol, const EdgeAtom *edge_atom)
|
2017-03-30 17:15:57 +08:00
|
|
|
: input_(input), input_symbol_(input_symbol), edge_atom_(edge_atom) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(EdgeFilter)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> EdgeFilter::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<EdgeFilterCursor>(*this, db);
|
|
|
|
}
|
2017-04-11 18:29:20 +08:00
|
|
|
EdgeFilter::EdgeFilterCursor::EdgeFilterCursor(const EdgeFilter &self,
|
2017-03-30 17:15:57 +08:00
|
|
|
GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
bool EdgeFilter::EdgeFilterCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
while (input_cursor_->Pull(frame, symbol_table)) {
|
2017-04-03 20:32:29 +08:00
|
|
|
auto &edge = frame[self_.input_symbol_].Value<EdgeAccessor>();
|
|
|
|
// Filter needs to use the old, unmodified edge, even though we may change
|
|
|
|
// properties or types during the current command.
|
|
|
|
edge.SwitchOld();
|
2017-03-30 17:15:57 +08:00
|
|
|
if (EdgePasses(edge, frame, symbol_table)) return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void EdgeFilter::EdgeFilterCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-03-30 17:15:57 +08:00
|
|
|
bool EdgeFilter::EdgeFilterCursor::EdgePasses(const EdgeAccessor &edge,
|
|
|
|
Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
// edge type filtering - logical OR
|
|
|
|
const auto &types = self_.edge_atom_->edge_types_;
|
|
|
|
GraphDbTypes::EdgeType type = edge.edge_type();
|
2017-04-12 15:01:44 +08:00
|
|
|
if (types.size() && std::none_of(types.begin(), types.end(),
|
|
|
|
[type](auto t) { return t == type; }))
|
2017-03-30 17:15:57 +08:00
|
|
|
return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
// We don't want newly set properties to affect filtering.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator expression_evaluator(frame, symbol_table, db_,
|
|
|
|
GraphView::OLD);
|
2017-03-30 17:15:57 +08:00
|
|
|
for (auto prop_pair : self_.edge_atom_->properties_) {
|
|
|
|
prop_pair.second->Accept(expression_evaluator);
|
|
|
|
TypedValue comparison_result =
|
|
|
|
edge.PropsAt(prop_pair.first) == expression_evaluator.PopBack();
|
2017-04-20 19:31:18 +08:00
|
|
|
if (comparison_result.IsNull() || !comparison_result.Value<bool>())
|
2017-03-30 17:15:57 +08:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
Filter::Filter(const std::shared_ptr<LogicalOperator> &input_,
|
|
|
|
Expression *expression_)
|
|
|
|
: input_(input_), expression_(expression_) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Filter)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Filter::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<FilterCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Filter::FilterCursor::FilterCursor(const Filter &self, GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool Filter::FilterCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
|
2017-04-03 20:32:29 +08:00
|
|
|
// Like all filters, newly set values should not affect filtering of old nodes
|
|
|
|
// and edges.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::OLD);
|
2017-03-30 17:15:57 +08:00
|
|
|
while (input_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
self_.expression_->Accept(evaluator);
|
|
|
|
TypedValue result = evaluator.PopBack();
|
2017-04-20 19:31:18 +08:00
|
|
|
if (result.IsNull() || !result.Value<bool>()) continue;
|
2017-03-30 17:15:57 +08:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Filter::FilterCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Produce::Produce(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
const std::vector<NamedExpression *> named_expressions)
|
2017-04-21 16:57:53 +08:00
|
|
|
: input_(input ? input : std::make_shared<Once>()),
|
|
|
|
named_expressions_(named_expressions) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Produce)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Produce::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<ProduceCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-26 22:12:39 +08:00
|
|
|
std::vector<Symbol> Produce::OutputSymbols(const SymbolTable &symbol_table) {
|
|
|
|
std::vector<Symbol> symbols;
|
|
|
|
for (const auto &named_expr : named_expressions_) {
|
|
|
|
symbols.emplace_back(symbol_table.at(*named_expr));
|
|
|
|
}
|
|
|
|
return symbols;
|
|
|
|
}
|
|
|
|
|
2017-03-30 17:15:57 +08:00
|
|
|
const std::vector<NamedExpression *> &Produce::named_expressions() {
|
|
|
|
return named_expressions_;
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Produce::ProduceCursor::ProduceCursor(const Produce &self, GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-04-24 16:16:53 +08:00
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool Produce::ProduceCursor::Pull(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
2017-04-21 16:57:53 +08:00
|
|
|
if (input_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
// Produce should always yield the latest results.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-03-30 17:15:57 +08:00
|
|
|
for (auto named_expr : self_.named_expressions_)
|
|
|
|
named_expr->Accept(evaluator);
|
|
|
|
return true;
|
2017-04-21 16:57:53 +08:00
|
|
|
}
|
|
|
|
return false;
|
2017-03-30 17:15:57 +08:00
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Produce::ProduceCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-03-30 17:15:57 +08:00
|
|
|
Delete::Delete(const std::shared_ptr<LogicalOperator> &input_,
|
|
|
|
const std::vector<Expression *> &expressions, bool detach_)
|
|
|
|
: input_(input_), expressions_(expressions), detach_(detach_) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Delete)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Delete::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<DeleteCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Delete::DeleteCursor::DeleteCursor(const Delete &self, GraphDbAccessor &db)
|
2017-03-30 17:15:57 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool Delete::DeleteCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
// Delete should get the latest information, this way it is also possible to
|
|
|
|
// delete newly added nodes and edges.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-04-14 21:03:18 +08:00
|
|
|
// collect expressions results so edges can get deleted before vertices
|
|
|
|
// this is necessary because an edge that gets deleted could block vertex
|
|
|
|
// deletion
|
|
|
|
std::vector<TypedValue> expression_results;
|
|
|
|
expression_results.reserve(self_.expressions_.size());
|
2017-03-30 17:15:57 +08:00
|
|
|
for (Expression *expression : self_.expressions_) {
|
|
|
|
expression->Accept(evaluator);
|
2017-04-14 21:03:18 +08:00
|
|
|
expression_results.emplace_back(evaluator.PopBack());
|
|
|
|
}
|
|
|
|
|
|
|
|
// delete edges first
|
|
|
|
for (TypedValue &expression_result : expression_results)
|
|
|
|
if (expression_result.type() == TypedValue::Type::Edge)
|
|
|
|
db_.remove_edge(expression_result.Value<EdgeAccessor>());
|
|
|
|
|
|
|
|
// delete vertices
|
|
|
|
for (TypedValue &expression_result : expression_results)
|
|
|
|
switch (expression_result.type()) {
|
|
|
|
case TypedValue::Type::Vertex: {
|
|
|
|
VertexAccessor &va = expression_result.Value<VertexAccessor>();
|
|
|
|
va.SwitchNew(); // necessary because an edge deletion could have
|
|
|
|
// updated
|
2017-03-30 17:15:57 +08:00
|
|
|
if (self_.detach_)
|
2017-04-14 21:03:18 +08:00
|
|
|
db_.detach_remove_vertex(va);
|
|
|
|
else if (!db_.remove_vertex(va))
|
2017-04-25 21:38:46 +08:00
|
|
|
throw QueryRuntimeException(
|
2017-03-30 17:15:57 +08:00
|
|
|
"Failed to remove vertex because of it's existing "
|
|
|
|
"connections. Consider using DETACH DELETE.");
|
|
|
|
break;
|
2017-04-14 21:03:18 +08:00
|
|
|
}
|
2017-03-30 17:15:57 +08:00
|
|
|
case TypedValue::Type::Edge:
|
|
|
|
break;
|
2017-04-14 21:03:18 +08:00
|
|
|
// check we're not trying to delete anything except vertices and edges
|
2017-03-30 17:15:57 +08:00
|
|
|
default:
|
|
|
|
throw TypedValueException("Can only delete edges and vertices");
|
|
|
|
}
|
2017-04-14 21:03:18 +08:00
|
|
|
|
2017-03-30 17:15:57 +08:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Delete::DeleteCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
SetProperty::SetProperty(const std::shared_ptr<LogicalOperator> &input,
|
2017-03-30 17:15:57 +08:00
|
|
|
PropertyLookup *lhs, Expression *rhs)
|
|
|
|
: input_(input), lhs_(lhs), rhs_(rhs) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(SetProperty)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> SetProperty::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<SetPropertyCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
SetProperty::SetPropertyCursor::SetPropertyCursor(const SetProperty &self,
|
2017-03-30 17:15:57 +08:00
|
|
|
GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
bool SetProperty::SetPropertyCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
// Set, just like Create needs to see the latest changes.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-03-30 17:15:57 +08:00
|
|
|
self_.lhs_->expression_->Accept(evaluator);
|
|
|
|
TypedValue lhs = evaluator.PopBack();
|
|
|
|
self_.rhs_->Accept(evaluator);
|
|
|
|
TypedValue rhs = evaluator.PopBack();
|
|
|
|
|
|
|
|
// TODO the following code uses implicit TypedValue to PropertyValue
|
|
|
|
// conversion which throws a TypedValueException if impossible
|
|
|
|
// this is correct, but the end user will get a not-very-informative
|
|
|
|
// error message, so address this when improving error feedback
|
|
|
|
|
|
|
|
switch (lhs.type()) {
|
|
|
|
case TypedValue::Type::Vertex:
|
|
|
|
lhs.Value<VertexAccessor>().PropsSet(self_.lhs_->property_, rhs);
|
|
|
|
break;
|
|
|
|
case TypedValue::Type::Edge:
|
|
|
|
lhs.Value<EdgeAccessor>().PropsSet(self_.lhs_->property_, rhs);
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Properties can only be set on Vertices and Edges");
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void SetProperty::SetPropertyCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
SetProperties::SetProperties(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
Symbol input_symbol, Expression *rhs, Op op)
|
2017-03-30 17:15:57 +08:00
|
|
|
: input_(input), input_symbol_(input_symbol), rhs_(rhs), op_(op) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(SetProperties)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> SetProperties::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<SetPropertiesCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
SetProperties::SetPropertiesCursor::SetPropertiesCursor(
|
|
|
|
const SetProperties &self, GraphDbAccessor &db)
|
2017-03-30 17:15:57 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
|
|
|
|
|
|
|
|
bool SetProperties::SetPropertiesCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
TypedValue &lhs = frame[self_.input_symbol_];
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
// Set, just like Create needs to see the latest changes.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-03-30 17:15:57 +08:00
|
|
|
self_.rhs_->Accept(evaluator);
|
|
|
|
TypedValue rhs = evaluator.PopBack();
|
|
|
|
|
|
|
|
switch (lhs.type()) {
|
|
|
|
case TypedValue::Type::Vertex:
|
|
|
|
Set(lhs.Value<VertexAccessor>(), rhs);
|
|
|
|
break;
|
|
|
|
case TypedValue::Type::Edge:
|
|
|
|
Set(lhs.Value<EdgeAccessor>(), rhs);
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Properties can only be set on Vertices and Edges");
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void SetProperties::SetPropertiesCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-03-30 17:15:57 +08:00
|
|
|
template <typename TRecordAccessor>
|
|
|
|
void SetProperties::SetPropertiesCursor::Set(TRecordAccessor &record,
|
2017-04-24 16:16:53 +08:00
|
|
|
const TypedValue &rhs) const {
|
2017-04-03 20:32:29 +08:00
|
|
|
record.SwitchNew();
|
2017-03-30 17:15:57 +08:00
|
|
|
if (self_.op_ == Op::REPLACE) record.PropsClear();
|
|
|
|
|
|
|
|
auto set_props = [&record](const auto &properties) {
|
|
|
|
for (const auto &kv : properties) record.PropsSet(kv.first, kv.second);
|
|
|
|
};
|
|
|
|
|
|
|
|
switch (rhs.type()) {
|
|
|
|
case TypedValue::Type::Edge:
|
|
|
|
set_props(rhs.Value<EdgeAccessor>().Properties());
|
|
|
|
break;
|
|
|
|
case TypedValue::Type::Vertex:
|
|
|
|
set_props(rhs.Value<VertexAccessor>().Properties());
|
|
|
|
break;
|
|
|
|
case TypedValue::Type::Map: {
|
|
|
|
// TODO the following code uses implicit TypedValue to PropertyValue
|
|
|
|
// conversion which throws a TypedValueException if impossible
|
|
|
|
// this is correct, but the end user will get a not-very-informative
|
|
|
|
// error message, so address this when improving error feedback
|
|
|
|
for (const auto &kv : rhs.Value<std::map<std::string, TypedValue>>())
|
|
|
|
record.PropsSet(db_.property(kv.first), kv.second);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Can only set Vertices, Edges and maps as properties");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// instantiate the SetProperties function with concrete TRecordAccessor types
|
|
|
|
template void SetProperties::SetPropertiesCursor::Set(
|
2017-04-24 16:16:53 +08:00
|
|
|
RecordAccessor<Vertex> &record, const TypedValue &rhs) const;
|
2017-03-30 17:15:57 +08:00
|
|
|
template void SetProperties::SetPropertiesCursor::Set(
|
2017-04-24 16:16:53 +08:00
|
|
|
RecordAccessor<Edge> &record, const TypedValue &rhs) const;
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
SetLabels::SetLabels(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
Symbol input_symbol,
|
2017-03-30 17:15:57 +08:00
|
|
|
const std::vector<GraphDbTypes::Label> &labels)
|
|
|
|
: input_(input), input_symbol_(input_symbol), labels_(labels) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(SetLabels)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> SetLabels::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<SetLabelsCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
SetLabels::SetLabelsCursor::SetLabelsCursor(const SetLabels &self,
|
2017-03-30 17:15:57 +08:00
|
|
|
GraphDbAccessor &db)
|
|
|
|
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool SetLabels::SetLabelsCursor::Pull(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
TypedValue &vertex_value = frame[self_.input_symbol_];
|
|
|
|
auto &vertex = vertex_value.Value<VertexAccessor>();
|
|
|
|
vertex.SwitchNew();
|
2017-03-30 17:15:57 +08:00
|
|
|
for (auto label : self_.labels_) vertex.add_label(label);
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void SetLabels::SetLabelsCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
RemoveProperty::RemoveProperty(const std::shared_ptr<LogicalOperator> &input,
|
2017-03-30 17:15:57 +08:00
|
|
|
PropertyLookup *lhs)
|
|
|
|
: input_(input), lhs_(lhs) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(RemoveProperty)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> RemoveProperty::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<RemovePropertyCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
RemoveProperty::RemovePropertyCursor::RemovePropertyCursor(
|
|
|
|
const RemoveProperty &self, GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
|
2017-03-30 17:15:57 +08:00
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
bool RemoveProperty::RemovePropertyCursor::Pull(
|
|
|
|
Frame &frame, const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
// Remove, just like Delete needs to see the latest changes.
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-03-30 17:15:57 +08:00
|
|
|
self_.lhs_->expression_->Accept(evaluator);
|
|
|
|
TypedValue lhs = evaluator.PopBack();
|
|
|
|
|
|
|
|
switch (lhs.type()) {
|
|
|
|
case TypedValue::Type::Vertex:
|
|
|
|
lhs.Value<VertexAccessor>().PropsErase(self_.lhs_->property_);
|
|
|
|
break;
|
|
|
|
case TypedValue::Type::Edge:
|
|
|
|
lhs.Value<EdgeAccessor>().PropsErase(self_.lhs_->property_);
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
// TODO consider throwing a TypedValueException here
|
|
|
|
// deal with this when we'll be overhauling error-feedback
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Properties can only be removed on Vertices and Edges");
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void RemoveProperty::RemovePropertyCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
RemoveLabels::RemoveLabels(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
Symbol input_symbol,
|
2017-03-30 17:15:57 +08:00
|
|
|
const std::vector<GraphDbTypes::Label> &labels)
|
|
|
|
: input_(input), input_symbol_(input_symbol), labels_(labels) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(RemoveLabels)
|
2017-03-30 17:15:57 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> RemoveLabels::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<RemoveLabelsCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
RemoveLabels::RemoveLabelsCursor::RemoveLabelsCursor(const RemoveLabels &self,
|
2017-03-30 17:15:57 +08:00
|
|
|
GraphDbAccessor &db)
|
|
|
|
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
|
|
|
|
|
2017-04-04 15:04:26 +08:00
|
|
|
bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-03-30 17:15:57 +08:00
|
|
|
if (!input_cursor_->Pull(frame, symbol_table)) return false;
|
|
|
|
|
2017-04-03 20:32:29 +08:00
|
|
|
TypedValue &vertex_value = frame[self_.input_symbol_];
|
|
|
|
auto &vertex = vertex_value.Value<VertexAccessor>();
|
|
|
|
vertex.SwitchNew();
|
2017-03-30 17:15:57 +08:00
|
|
|
for (auto label : self_.labels_) vertex.remove_label(label);
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void RemoveLabels::RemoveLabelsCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
|
2017-04-04 15:04:26 +08:00
|
|
|
template <typename TAccessor>
|
|
|
|
ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilter(
|
|
|
|
const std::shared_ptr<LogicalOperator> &input, Symbol expand_symbol,
|
|
|
|
const std::vector<Symbol> &previous_symbols)
|
|
|
|
: input_(input),
|
|
|
|
expand_symbol_(expand_symbol),
|
|
|
|
previous_symbols_(previous_symbols) {}
|
|
|
|
|
|
|
|
template <typename TAccessor>
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(ExpandUniquenessFilter<TAccessor>)
|
2017-04-04 15:04:26 +08:00
|
|
|
|
|
|
|
template <typename TAccessor>
|
|
|
|
std::unique_ptr<Cursor> ExpandUniquenessFilter<TAccessor>::MakeCursor(
|
|
|
|
GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<ExpandUniquenessFilterCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
|
|
|
template <typename TAccessor>
|
|
|
|
ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilterCursor::
|
2017-04-11 18:29:20 +08:00
|
|
|
ExpandUniquenessFilterCursor(const ExpandUniquenessFilter &self,
|
2017-04-04 15:04:26 +08:00
|
|
|
GraphDbAccessor &db)
|
|
|
|
: self_(self), input_cursor_(self.input_->MakeCursor(db)) {}
|
|
|
|
|
|
|
|
template <typename TAccessor>
|
|
|
|
bool ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilterCursor::Pull(
|
2017-04-11 18:29:20 +08:00
|
|
|
Frame &frame, const SymbolTable &symbol_table) {
|
2017-04-04 15:04:26 +08:00
|
|
|
auto expansion_ok = [&]() {
|
|
|
|
TypedValue &expand_value = frame[self_.expand_symbol_];
|
|
|
|
TAccessor &expand_accessor = expand_value.Value<TAccessor>();
|
|
|
|
for (const auto &previous_symbol : self_.previous_symbols_) {
|
|
|
|
TypedValue &previous_value = frame[previous_symbol];
|
|
|
|
TAccessor &previous_accessor = previous_value.Value<TAccessor>();
|
|
|
|
if (expand_accessor == previous_accessor) return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
};
|
|
|
|
|
|
|
|
while (input_cursor_->Pull(frame, symbol_table))
|
2017-04-05 21:00:26 +08:00
|
|
|
if (expansion_ok()) return true;
|
2017-04-04 15:04:26 +08:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
template <typename TAccessor>
|
|
|
|
void ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilterCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
}
|
|
|
|
|
2017-04-04 15:04:26 +08:00
|
|
|
// instantiations of the ExpandUniquenessFilter template class
|
|
|
|
// we only ever need these two
|
|
|
|
template class ExpandUniquenessFilter<VertexAccessor>;
|
|
|
|
template class ExpandUniquenessFilter<EdgeAccessor>;
|
|
|
|
|
2017-04-05 21:00:26 +08:00
|
|
|
namespace {
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Helper function for recursively reconstructing all the accessors in the
|
|
|
|
* given TypedValue.
|
|
|
|
*/
|
|
|
|
void ReconstructTypedValue(TypedValue &value) {
|
|
|
|
switch (value.type()) {
|
|
|
|
case TypedValue::Type::Vertex:
|
2017-04-18 17:50:01 +08:00
|
|
|
if (!value.Value<VertexAccessor>().Reconstruct())
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Vertex invalid after WITH clause, (most likely deleted by a "
|
|
|
|
"preceeding DELETE clause)");
|
2017-04-05 21:00:26 +08:00
|
|
|
break;
|
|
|
|
case TypedValue::Type::Edge:
|
2017-04-18 17:50:01 +08:00
|
|
|
if (!value.Value<VertexAccessor>().Reconstruct())
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Edge invalid after WITH clause, (most likely deleted by a "
|
|
|
|
"preceeding DELETE clause)");
|
2017-04-05 21:00:26 +08:00
|
|
|
break;
|
|
|
|
case TypedValue::Type::List:
|
|
|
|
for (TypedValue &inner_value : value.Value<std::vector<TypedValue>>())
|
|
|
|
ReconstructTypedValue(inner_value);
|
|
|
|
break;
|
|
|
|
case TypedValue::Type::Map:
|
|
|
|
for (auto &kv : value.Value<std::map<std::string, TypedValue>>())
|
|
|
|
ReconstructTypedValue(kv.second);
|
|
|
|
break;
|
2017-04-11 21:11:48 +08:00
|
|
|
case TypedValue::Type::Path:
|
|
|
|
// TODO implement path reconstruct?
|
2017-04-19 21:52:20 +08:00
|
|
|
throw utils::NotYetImplemented("Path reconstruction not yet supported");
|
2017-04-05 21:00:26 +08:00
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Accumulate::Accumulate(const std::shared_ptr<LogicalOperator> &input,
|
2017-04-05 21:00:26 +08:00
|
|
|
const std::vector<Symbol> &symbols, bool advance_command)
|
|
|
|
: input_(input), symbols_(symbols), advance_command_(advance_command) {}
|
2017-04-05 16:34:10 +08:00
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Accumulate)
|
|
|
|
|
2017-04-05 16:34:10 +08:00
|
|
|
std::unique_ptr<Cursor> Accumulate::MakeCursor(GraphDbAccessor &db) {
|
2017-04-05 21:00:26 +08:00
|
|
|
return std::make_unique<Accumulate::AccumulateCursor>(*this, db);
|
2017-04-05 16:34:10 +08:00
|
|
|
}
|
|
|
|
|
2017-04-11 18:29:20 +08:00
|
|
|
Accumulate::AccumulateCursor::AccumulateCursor(const Accumulate &self,
|
2017-04-05 21:00:26 +08:00
|
|
|
GraphDbAccessor &db)
|
|
|
|
: self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {}
|
2017-04-05 16:34:10 +08:00
|
|
|
|
2017-04-05 21:00:26 +08:00
|
|
|
bool Accumulate::AccumulateCursor::Pull(Frame &frame,
|
2017-04-11 18:29:20 +08:00
|
|
|
const SymbolTable &symbol_table) {
|
2017-04-05 21:00:26 +08:00
|
|
|
// cache all the input
|
|
|
|
if (!pulled_all_input_) {
|
|
|
|
while (input_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
cache_.emplace_back();
|
|
|
|
auto &row = cache_.back();
|
|
|
|
for (const Symbol &symbol : self_.symbols_)
|
|
|
|
row.emplace_back(frame[symbol]);
|
|
|
|
}
|
|
|
|
pulled_all_input_ = true;
|
|
|
|
cache_it_ = cache_.begin();
|
|
|
|
|
|
|
|
if (self_.advance_command_) {
|
|
|
|
db_.advance_command();
|
|
|
|
for (auto &row : cache_)
|
|
|
|
for (auto &col : row) ReconstructTypedValue(col);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cache_it_ == cache_.end()) return false;
|
|
|
|
auto row_it = (cache_it_++)->begin();
|
|
|
|
for (const Symbol &symbol : self_.symbols_) frame[symbol] = *row_it++;
|
|
|
|
return true;
|
2017-04-05 16:34:10 +08:00
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Accumulate::AccumulateCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
cache_.clear();
|
|
|
|
cache_it_ = cache_.begin();
|
|
|
|
pulled_all_input_ = false;
|
|
|
|
}
|
|
|
|
|
2017-04-06 15:31:02 +08:00
|
|
|
Aggregate::Aggregate(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
const std::vector<Aggregate::Element> &aggregations,
|
2017-04-11 21:11:48 +08:00
|
|
|
const std::vector<Expression *> &group_by,
|
2017-04-20 21:19:52 +08:00
|
|
|
const std::vector<Symbol> &remember)
|
2017-04-21 16:57:53 +08:00
|
|
|
: input_(input ? input : std::make_shared<Once>()),
|
2017-04-11 21:11:48 +08:00
|
|
|
aggregations_(aggregations),
|
|
|
|
group_by_(group_by),
|
2017-04-20 21:19:52 +08:00
|
|
|
remember_(remember) {}
|
2017-04-06 15:31:02 +08:00
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Aggregate)
|
2017-04-06 15:31:02 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Aggregate::MakeCursor(GraphDbAccessor &db) {
|
2017-04-11 21:11:48 +08:00
|
|
|
return std::make_unique<AggregateCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
|
|
|
Aggregate::AggregateCursor::AggregateCursor(Aggregate &self,
|
|
|
|
GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-04-11 21:11:48 +08:00
|
|
|
|
|
|
|
bool Aggregate::AggregateCursor::Pull(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
|
|
|
if (!pulled_all_input_) {
|
2017-04-13 22:47:11 +08:00
|
|
|
ProcessAll(frame, symbol_table);
|
2017-04-11 21:11:48 +08:00
|
|
|
pulled_all_input_ = true;
|
|
|
|
aggregation_it_ = aggregation_.begin();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (aggregation_it_ == aggregation_.end()) return false;
|
|
|
|
|
|
|
|
// place aggregation values on the frame
|
|
|
|
auto aggregation_values_it = aggregation_it_->second.values_.begin();
|
|
|
|
for (const auto &aggregation_elem : self_.aggregations_)
|
|
|
|
frame[std::get<2>(aggregation_elem)] = *aggregation_values_it++;
|
|
|
|
|
|
|
|
// place remember values on the frame
|
|
|
|
auto remember_values_it = aggregation_it_->second.remember_.begin();
|
|
|
|
for (const Symbol &remember_sym : self_.remember_)
|
|
|
|
frame[remember_sym] = *remember_values_it++;
|
|
|
|
|
|
|
|
aggregation_it_++;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-13 22:47:11 +08:00
|
|
|
void Aggregate::AggregateCursor::ProcessAll(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
2017-04-25 21:38:46 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_, GraphView::NEW);
|
2017-04-21 16:57:53 +08:00
|
|
|
while (input_cursor_->Pull(frame, symbol_table))
|
2017-04-13 22:47:11 +08:00
|
|
|
ProcessOne(frame, symbol_table, evaluator);
|
2017-04-11 21:11:48 +08:00
|
|
|
|
|
|
|
// calculate AVG aggregations (so far they have only been summed)
|
|
|
|
for (int pos = 0; pos < self_.aggregations_.size(); ++pos) {
|
|
|
|
if (std::get<1>(self_.aggregations_[pos]) != Aggregation::Op::AVG) continue;
|
|
|
|
for (auto &kv : aggregation_) {
|
|
|
|
AggregationValue &agg_value = kv.second;
|
|
|
|
int count = agg_value.counts_[pos];
|
|
|
|
if (count > 0)
|
|
|
|
agg_value.values_[pos] = agg_value.values_[pos] / (double)count;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-13 22:47:11 +08:00
|
|
|
void Aggregate::AggregateCursor::ProcessOne(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table,
|
|
|
|
ExpressionEvaluator &evaluator) {
|
|
|
|
// create the group-by list of values
|
|
|
|
std::list<TypedValue> group_by;
|
|
|
|
for (Expression *expression : self_.group_by_) {
|
|
|
|
expression->Accept(evaluator);
|
|
|
|
group_by.emplace_back(evaluator.PopBack());
|
|
|
|
}
|
|
|
|
|
|
|
|
AggregationValue &agg_value = aggregation_[group_by];
|
|
|
|
EnsureInitialized(frame, agg_value);
|
|
|
|
Update(frame, symbol_table, evaluator, agg_value);
|
|
|
|
}
|
|
|
|
|
2017-04-11 21:11:48 +08:00
|
|
|
void Aggregate::AggregateCursor::EnsureInitialized(
|
2017-04-24 16:16:53 +08:00
|
|
|
Frame &frame,
|
|
|
|
Aggregate::AggregateCursor::AggregationValue &agg_value) const {
|
2017-04-11 21:11:48 +08:00
|
|
|
if (agg_value.values_.size() > 0) return;
|
|
|
|
|
2017-04-12 21:47:55 +08:00
|
|
|
for (const auto &agg_elem : self_.aggregations_) {
|
|
|
|
if (std::get<1>(agg_elem) == Aggregation::Op::COUNT)
|
|
|
|
agg_value.values_.emplace_back(TypedValue(0));
|
|
|
|
else
|
|
|
|
agg_value.values_.emplace_back(TypedValue::Null);
|
|
|
|
}
|
2017-04-11 21:11:48 +08:00
|
|
|
agg_value.counts_.resize(self_.aggregations_.size(), 0);
|
|
|
|
|
|
|
|
for (const Symbol &remember_sym : self_.remember_)
|
|
|
|
agg_value.remember_.push_back(frame[remember_sym]);
|
|
|
|
}
|
|
|
|
|
|
|
|
void Aggregate::AggregateCursor::Update(
|
|
|
|
Frame &frame, const SymbolTable &symbol_table,
|
|
|
|
ExpressionEvaluator &evaluator,
|
|
|
|
Aggregate::AggregateCursor::AggregationValue &agg_value) {
|
|
|
|
debug_assert(self_.aggregations_.size() == agg_value.values_.size(),
|
|
|
|
"Inappropriate AggregationValue.values_ size");
|
|
|
|
debug_assert(self_.aggregations_.size() == agg_value.counts_.size(),
|
|
|
|
"Inappropriate AggregationValue.counts_ size");
|
|
|
|
|
|
|
|
// we iterate over counts, values and aggregation info at the same time
|
|
|
|
auto count_it = agg_value.counts_.begin();
|
|
|
|
auto value_it = agg_value.values_.begin();
|
|
|
|
auto agg_elem_it = self_.aggregations_.begin();
|
|
|
|
for (; count_it < agg_value.counts_.end();
|
|
|
|
count_it++, value_it++, agg_elem_it++) {
|
|
|
|
std::get<0>(*agg_elem_it)->Accept(evaluator);
|
|
|
|
TypedValue input_value = evaluator.PopBack();
|
|
|
|
|
2017-04-12 21:47:55 +08:00
|
|
|
// Aggregations skip Null input values.
|
2017-04-20 19:31:18 +08:00
|
|
|
if (input_value.IsNull()) continue;
|
2017-04-11 21:11:48 +08:00
|
|
|
|
2017-04-12 21:47:55 +08:00
|
|
|
const auto &agg_op = std::get<1>(*agg_elem_it);
|
2017-04-11 21:11:48 +08:00
|
|
|
*count_it += 1;
|
|
|
|
if (*count_it == 1) {
|
2017-04-14 23:14:14 +08:00
|
|
|
// first value, nothing to aggregate. check type, set and continue.
|
|
|
|
switch (agg_op) {
|
|
|
|
case Aggregation::Op::MIN:
|
|
|
|
case Aggregation::Op::MAX:
|
|
|
|
EnsureOkForMinMax(input_value);
|
|
|
|
break;
|
|
|
|
case Aggregation::Op::SUM:
|
|
|
|
case Aggregation::Op::AVG:
|
|
|
|
EnsureOkForAvgSum(input_value);
|
|
|
|
break;
|
|
|
|
case Aggregation::Op::COUNT:
|
|
|
|
break;
|
|
|
|
}
|
2017-04-12 21:47:55 +08:00
|
|
|
*value_it = agg_op == Aggregation::Op::COUNT ? 1 : input_value;
|
2017-04-11 21:11:48 +08:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
// aggregation of existing values
|
2017-04-12 21:47:55 +08:00
|
|
|
switch (agg_op) {
|
2017-04-11 21:11:48 +08:00
|
|
|
case Aggregation::Op::COUNT:
|
|
|
|
*value_it = *count_it;
|
|
|
|
break;
|
|
|
|
case Aggregation::Op::MIN: {
|
|
|
|
EnsureOkForMinMax(input_value);
|
|
|
|
// TODO an illegal comparison here will throw a TypedValueException
|
|
|
|
// consider catching and throwing something else
|
|
|
|
TypedValue comparison_result = input_value < *value_it;
|
|
|
|
// since we skip nulls we either have a valid comparison, or
|
|
|
|
// an exception was just thrown above
|
|
|
|
// safe to assume a bool TypedValue
|
|
|
|
if (comparison_result.Value<bool>()) *value_it = input_value;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case Aggregation::Op::MAX: {
|
|
|
|
// all comments as for Op::Min
|
|
|
|
EnsureOkForMinMax(input_value);
|
|
|
|
TypedValue comparison_result = input_value > *value_it;
|
|
|
|
if (comparison_result.Value<bool>()) *value_it = input_value;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case Aggregation::Op::AVG:
|
|
|
|
// for averaging we sum first and divide by count once all
|
|
|
|
// the input has been processed
|
|
|
|
case Aggregation::Op::SUM:
|
|
|
|
EnsureOkForAvgSum(input_value);
|
|
|
|
*value_it = *value_it + input_value;
|
|
|
|
break;
|
|
|
|
} // end switch over Aggregation::Op enum
|
|
|
|
} // end loop over all aggregations
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Aggregate::AggregateCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
aggregation_.clear();
|
|
|
|
aggregation_it_ = aggregation_.begin();
|
|
|
|
pulled_all_input_ = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
void Aggregate::AggregateCursor::EnsureOkForMinMax(
|
|
|
|
const TypedValue &value) const {
|
2017-04-11 21:11:48 +08:00
|
|
|
switch (value.type()) {
|
|
|
|
case TypedValue::Type::Bool:
|
|
|
|
case TypedValue::Type::Int:
|
|
|
|
case TypedValue::Type::Double:
|
|
|
|
case TypedValue::Type::String:
|
|
|
|
return;
|
|
|
|
default:
|
|
|
|
// TODO consider better error feedback
|
|
|
|
throw TypedValueException(
|
|
|
|
"Only Bool, Int, Double and String properties are allowed in "
|
|
|
|
"MIN and MAX aggregations");
|
|
|
|
}
|
|
|
|
}
|
2017-04-24 16:16:53 +08:00
|
|
|
void Aggregate::AggregateCursor::EnsureOkForAvgSum(
|
|
|
|
const TypedValue &value) const {
|
2017-04-11 21:11:48 +08:00
|
|
|
switch (value.type()) {
|
|
|
|
case TypedValue::Type::Int:
|
|
|
|
case TypedValue::Type::Double:
|
|
|
|
return;
|
|
|
|
default:
|
|
|
|
// TODO consider better error feedback
|
|
|
|
throw TypedValueException(
|
|
|
|
"Only numeric properties allowed in SUM and AVG aggregations");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool Aggregate::AggregateCursor::TypedValueListEqual::operator()(
|
|
|
|
const std::list<TypedValue> &left,
|
|
|
|
const std::list<TypedValue> &right) const {
|
|
|
|
return std::equal(left.begin(), left.end(), right.begin(),
|
|
|
|
TypedValue::BoolEqual{});
|
2017-04-06 15:31:02 +08:00
|
|
|
}
|
|
|
|
|
2017-04-18 21:19:42 +08:00
|
|
|
Skip::Skip(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
Expression *expression)
|
|
|
|
: input_(input), expression_(expression) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Skip)
|
2017-04-18 21:19:42 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Skip::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<SkipCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-26 22:12:39 +08:00
|
|
|
std::vector<Symbol> Skip::OutputSymbols(const SymbolTable &symbol_table) {
|
|
|
|
// Propagate this to potential Produce.
|
|
|
|
return input_->OutputSymbols(symbol_table);
|
|
|
|
}
|
|
|
|
|
2017-04-18 21:19:42 +08:00
|
|
|
Skip::SkipCursor::SkipCursor(Skip &self, GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-04-18 21:19:42 +08:00
|
|
|
|
|
|
|
bool Skip::SkipCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
|
|
|
|
while (input_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
if (to_skip_ == -1) {
|
|
|
|
// first successful pull from the input
|
|
|
|
// evaluate the skip expression
|
2017-04-24 20:37:18 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_);
|
2017-04-18 21:19:42 +08:00
|
|
|
self_.expression_->Accept(evaluator);
|
|
|
|
TypedValue to_skip = evaluator.PopBack();
|
|
|
|
if (to_skip.type() != TypedValue::Type::Int)
|
|
|
|
throw QueryRuntimeException("Result of SKIP expression must be an int");
|
|
|
|
|
|
|
|
to_skip_ = to_skip.Value<int64_t>();
|
|
|
|
if (to_skip_ < 0)
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Result of SKIP expression must be greater or equal to zero");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (skipped_++ < to_skip_) continue;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Skip::SkipCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
to_skip_ = -1;
|
|
|
|
skipped_ = 0;
|
|
|
|
}
|
|
|
|
|
2017-04-18 21:19:42 +08:00
|
|
|
Limit::Limit(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
Expression *expression)
|
|
|
|
: input_(input), expression_(expression) {}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(Limit)
|
2017-04-18 21:19:42 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Limit::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<LimitCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-26 22:12:39 +08:00
|
|
|
std::vector<Symbol> Limit::OutputSymbols(const SymbolTable &symbol_table) {
|
|
|
|
// Propagate this to potential Produce.
|
|
|
|
return input_->OutputSymbols(symbol_table);
|
|
|
|
}
|
|
|
|
|
2017-04-18 21:19:42 +08:00
|
|
|
Limit::LimitCursor::LimitCursor(Limit &self, GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-04-18 21:19:42 +08:00
|
|
|
|
|
|
|
bool Limit::LimitCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
|
|
|
|
// we need to evaluate the limit expression before the first input Pull
|
|
|
|
// because it might be 0 and thereby we shouldn't Pull from input at all
|
|
|
|
// we can do this before Pulling from the input because the limit expression
|
|
|
|
// is not allowed to contain any identifiers
|
|
|
|
if (limit_ == -1) {
|
2017-04-24 20:37:18 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_);
|
2017-04-18 21:19:42 +08:00
|
|
|
self_.expression_->Accept(evaluator);
|
|
|
|
TypedValue limit = evaluator.PopBack();
|
|
|
|
if (limit.type() != TypedValue::Type::Int)
|
|
|
|
throw QueryRuntimeException("Result of LIMIT expression must be an int");
|
|
|
|
|
|
|
|
limit_ = limit.Value<int64_t>();
|
|
|
|
if (limit_ < 0)
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Result of LIMIT expression must be greater or equal to zero");
|
|
|
|
}
|
|
|
|
|
|
|
|
// check we have not exceeded the limit before pulling
|
2017-04-20 19:31:18 +08:00
|
|
|
if (pulled_++ >= limit_) return false;
|
2017-04-18 21:19:42 +08:00
|
|
|
|
|
|
|
return input_cursor_->Pull(frame, symbol_table);
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void Limit::LimitCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
limit_ = -1;
|
|
|
|
pulled_ = 0;
|
|
|
|
}
|
|
|
|
|
2017-04-20 19:31:18 +08:00
|
|
|
OrderBy::OrderBy(const std::shared_ptr<LogicalOperator> &input,
|
2017-04-24 19:51:16 +08:00
|
|
|
const std::vector<std::pair<Ordering, Expression *>> &order_by,
|
|
|
|
const std::vector<Symbol> &output_symbols)
|
|
|
|
: input_(input), output_symbols_(output_symbols) {
|
2017-04-20 19:31:18 +08:00
|
|
|
// split the order_by vector into two vectors of orderings and expressions
|
|
|
|
std::vector<Ordering> ordering;
|
|
|
|
ordering.reserve(order_by.size());
|
|
|
|
order_by_.reserve(order_by.size());
|
|
|
|
for (const auto &ordering_expression_pair : order_by) {
|
|
|
|
ordering.emplace_back(ordering_expression_pair.first);
|
|
|
|
order_by_.emplace_back(ordering_expression_pair.second);
|
|
|
|
}
|
|
|
|
compare_ = TypedValueListCompare(ordering);
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
ACCEPT_WITH_INPUT(OrderBy)
|
2017-04-20 19:31:18 +08:00
|
|
|
|
|
|
|
std::unique_ptr<Cursor> OrderBy::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<OrderByCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
2017-04-26 22:12:39 +08:00
|
|
|
std::vector<Symbol> OrderBy::OutputSymbols(const SymbolTable &symbol_table) {
|
|
|
|
// Propagate this to potential Produce.
|
|
|
|
return input_->OutputSymbols(symbol_table);
|
|
|
|
}
|
|
|
|
|
2017-04-20 19:31:18 +08:00
|
|
|
OrderBy::OrderByCursor::OrderByCursor(OrderBy &self, GraphDbAccessor &db)
|
2017-04-24 20:37:18 +08:00
|
|
|
: self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {}
|
2017-04-20 19:31:18 +08:00
|
|
|
|
|
|
|
bool OrderBy::OrderByCursor::Pull(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
|
|
|
if (!did_pull_all_) {
|
2017-04-24 20:37:18 +08:00
|
|
|
ExpressionEvaluator evaluator(frame, symbol_table, db_);
|
2017-04-20 19:31:18 +08:00
|
|
|
while (input_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
// collect the order_by elements
|
|
|
|
std::list<TypedValue> order_by;
|
|
|
|
for (auto expression_ptr : self_.order_by_) {
|
|
|
|
expression_ptr->Accept(evaluator);
|
|
|
|
order_by.emplace_back(evaluator.PopBack());
|
|
|
|
}
|
|
|
|
|
2017-04-24 19:51:16 +08:00
|
|
|
// collect the output elements
|
|
|
|
std::list<TypedValue> output;
|
|
|
|
for (const Symbol &output_sym : self_.output_symbols_)
|
|
|
|
output.emplace_back(frame[output_sym]);
|
2017-04-20 19:31:18 +08:00
|
|
|
|
2017-04-24 19:51:16 +08:00
|
|
|
cache_.emplace_back(order_by, output);
|
2017-04-20 19:31:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
std::sort(cache_.begin(), cache_.end(),
|
|
|
|
[this](const auto &pair1, const auto &pair2) {
|
|
|
|
return self_.compare_(pair1.first, pair2.first);
|
|
|
|
});
|
|
|
|
|
|
|
|
did_pull_all_ = true;
|
|
|
|
cache_it_ = cache_.begin();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cache_it_ == cache_.end()) return false;
|
|
|
|
|
2017-04-24 19:51:16 +08:00
|
|
|
// place the output values on the frame
|
|
|
|
debug_assert(self_.output_symbols_.size() == cache_it_->second.size(),
|
|
|
|
"Number of values does not match the number of output symbols "
|
2017-04-20 19:31:18 +08:00
|
|
|
"in OrderBy");
|
2017-04-24 19:51:16 +08:00
|
|
|
auto output_sym_it = self_.output_symbols_.begin();
|
|
|
|
for (const TypedValue &output : cache_it_->second)
|
|
|
|
frame[*output_sym_it++] = output;
|
2017-04-20 19:31:18 +08:00
|
|
|
|
|
|
|
cache_it_++;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-24 16:16:53 +08:00
|
|
|
void OrderBy::OrderByCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
did_pull_all_ = false;
|
|
|
|
cache_.clear();
|
|
|
|
cache_it_ = cache_.begin();
|
|
|
|
}
|
|
|
|
|
2017-04-20 19:31:18 +08:00
|
|
|
bool OrderBy::TypedValueCompare(const TypedValue &a, const TypedValue &b) {
|
|
|
|
// in ordering null comes after everything else
|
|
|
|
// at the same time Null is not less that null
|
|
|
|
// first deal with Null < Whatever case
|
|
|
|
if (a.IsNull()) return false;
|
|
|
|
// now deal with NotNull < Null case
|
|
|
|
if (b.IsNull()) return true;
|
|
|
|
|
|
|
|
// comparisons are from this point legal only between values of
|
|
|
|
// the same type, or int+float combinations
|
|
|
|
if ((a.type() != b.type() && !(a.IsNumeric() && b.IsNumeric())))
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Can't compare value of type {} to value of type {}", a.type(),
|
|
|
|
b.type());
|
|
|
|
|
|
|
|
switch (a.type()) {
|
|
|
|
case TypedValue::Type::Bool:
|
|
|
|
return !a.Value<bool>() && b.Value<bool>();
|
|
|
|
case TypedValue::Type::Int:
|
|
|
|
if (b.type() == TypedValue::Type::Double)
|
|
|
|
return a.Value<int64_t>() < b.Value<double>();
|
|
|
|
else
|
|
|
|
return a.Value<int64_t>() < b.Value<int64_t>();
|
|
|
|
case TypedValue::Type::Double:
|
|
|
|
if (b.type() == TypedValue::Type::Int)
|
|
|
|
return a.Value<double>() < b.Value<int64_t>();
|
|
|
|
else
|
|
|
|
return a.Value<double>() < b.Value<double>();
|
|
|
|
case TypedValue::Type::String:
|
|
|
|
return a.Value<std::string>() < b.Value<std::string>();
|
|
|
|
case TypedValue::Type::List:
|
|
|
|
case TypedValue::Type::Map:
|
|
|
|
case TypedValue::Type::Vertex:
|
|
|
|
case TypedValue::Type::Edge:
|
|
|
|
case TypedValue::Type::Path:
|
|
|
|
throw QueryRuntimeException(
|
|
|
|
"Comparison is not defined for values of type {}", a.type());
|
|
|
|
default:
|
|
|
|
permanent_fail("Unhandled comparison for types");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool OrderBy::TypedValueListCompare::operator()(
|
|
|
|
const std::list<TypedValue> &c1, const std::list<TypedValue> &c2) const {
|
|
|
|
auto c1_it = c1.begin();
|
|
|
|
auto c2_it = c2.begin();
|
|
|
|
// ordering is invalid if there are more elements in the collections
|
|
|
|
// then there are in the ordering_ vector
|
|
|
|
debug_assert(std::distance(c1_it, c1.end()) <= ordering_.size() &&
|
|
|
|
std::distance(c2_it, c2.end()) <= ordering_.size(),
|
|
|
|
"Collections contain more elements then there are orderings");
|
|
|
|
|
|
|
|
auto ordering_it = ordering_.begin();
|
|
|
|
for (; c1_it != c1.end() && c2_it != c2.end();
|
|
|
|
c1_it++, c2_it++, ordering_it++) {
|
|
|
|
if (OrderBy::TypedValueCompare(*c1_it, *c2_it))
|
|
|
|
return *ordering_it == Ordering::ASC;
|
|
|
|
if (OrderBy::TypedValueCompare(*c2_it, *c1_it))
|
|
|
|
return *ordering_it == Ordering::DESC;
|
|
|
|
}
|
|
|
|
|
|
|
|
// at least one collection is exhausted
|
|
|
|
// c1 is less then c2 iff c1 reached the end but c2 didn't
|
|
|
|
return (c1_it == c1.end()) && (c2_it != c2.end());
|
|
|
|
}
|
|
|
|
|
2017-04-25 21:22:21 +08:00
|
|
|
Merge::Merge(const std::shared_ptr<LogicalOperator> input,
|
|
|
|
const std::shared_ptr<LogicalOperator> merge_match,
|
|
|
|
const std::shared_ptr<LogicalOperator> merge_create)
|
|
|
|
: input_(input ? input : std::make_shared<Once>()),
|
|
|
|
merge_match_(merge_match),
|
|
|
|
merge_create_(merge_create) {}
|
|
|
|
|
|
|
|
void Merge::Accept(LogicalOperatorVisitor &visitor) {
|
|
|
|
if (visitor.PreVisit(*this)) {
|
|
|
|
visitor.Visit(*this);
|
|
|
|
input_->Accept(visitor);
|
|
|
|
merge_match_->Accept(visitor);
|
|
|
|
merge_create_->Accept(visitor);
|
|
|
|
visitor.PostVisit(*this);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Merge::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<MergeCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
|
|
|
Merge::MergeCursor::MergeCursor(Merge &self, GraphDbAccessor &db)
|
|
|
|
: input_cursor_(self.input_->MakeCursor(db)),
|
|
|
|
merge_match_cursor_(self.merge_match_->MakeCursor(db)),
|
|
|
|
merge_create_cursor_(self.merge_create_->MakeCursor(db)) {}
|
|
|
|
|
|
|
|
bool Merge::MergeCursor::Pull(Frame &frame, const SymbolTable &symbol_table) {
|
2017-04-27 16:48:21 +08:00
|
|
|
if (pull_input_) {
|
|
|
|
if (input_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
// after a successful input from the input
|
|
|
|
// reset merge_match (it's expand iterators maintain state)
|
|
|
|
// and merge_create (could have a Once at the beginning)
|
|
|
|
merge_match_cursor_->Reset();
|
|
|
|
merge_create_cursor_->Reset();
|
|
|
|
} else
|
|
|
|
// input is exhausted, we're done
|
|
|
|
return false;
|
|
|
|
}
|
2017-04-25 21:22:21 +08:00
|
|
|
|
2017-04-27 16:48:21 +08:00
|
|
|
// pull from the merge_match cursor
|
|
|
|
if (merge_match_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
// if successful, next Pull from this should not pull_input_
|
|
|
|
pull_input_ = false;
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
// failed to Pull from the merge_match cursor
|
|
|
|
if (pull_input_) {
|
|
|
|
// if we have just now pulled from the input
|
|
|
|
// and failed to pull from merge_match, we should create
|
|
|
|
bool merge_create_pull_result =
|
|
|
|
merge_create_cursor_->Pull(frame, symbol_table);
|
|
|
|
debug_assert(merge_create_pull_result, "MergeCreate must never fail");
|
2017-04-25 21:22:21 +08:00
|
|
|
return true;
|
|
|
|
}
|
2017-04-27 16:48:21 +08:00
|
|
|
// we have exhausted merge_match_cursor_ after 1 or more successful Pulls
|
|
|
|
// attempt next input_cursor_ pull
|
|
|
|
pull_input_ = true;
|
|
|
|
return Pull(frame, symbol_table);
|
2017-04-25 21:22:21 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Merge::MergeCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
merge_match_cursor_->Reset();
|
|
|
|
merge_create_cursor_->Reset();
|
2017-04-27 16:48:21 +08:00
|
|
|
pull_input_ = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
Optional::Optional(const std::shared_ptr<LogicalOperator> &input,
|
|
|
|
const std::shared_ptr<LogicalOperator> &optional,
|
|
|
|
const std::vector<Symbol> &optional_symbols)
|
|
|
|
: input_(input ? input : std::make_shared<Once>()),
|
|
|
|
optional_(optional),
|
|
|
|
optional_symbols_(optional_symbols) {}
|
|
|
|
|
|
|
|
void Optional::Accept(LogicalOperatorVisitor &visitor) {
|
|
|
|
if (visitor.PreVisit(*this)) {
|
|
|
|
visitor.Visit(*this);
|
|
|
|
input_->Accept(visitor);
|
|
|
|
optional_->Accept(visitor);
|
|
|
|
visitor.PostVisit(*this);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
std::unique_ptr<Cursor> Optional::MakeCursor(GraphDbAccessor &db) {
|
|
|
|
return std::make_unique<OptionalCursor>(*this, db);
|
|
|
|
}
|
|
|
|
|
|
|
|
Optional::OptionalCursor::OptionalCursor(Optional &self, GraphDbAccessor &db)
|
|
|
|
: self_(self),
|
|
|
|
input_cursor_(self.input_->MakeCursor(db)),
|
|
|
|
optional_cursor_(self.optional_->MakeCursor(db)) {}
|
|
|
|
|
|
|
|
bool Optional::OptionalCursor::Pull(Frame &frame,
|
|
|
|
const SymbolTable &symbol_table) {
|
|
|
|
if (pull_input_) {
|
|
|
|
if (input_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
// after a successful input from the input
|
|
|
|
// reset optional_ (it's expand iterators maintain state)
|
|
|
|
optional_cursor_->Reset();
|
|
|
|
} else
|
|
|
|
// input is exhausted, we're done
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// pull from the optional_ cursor
|
|
|
|
if (optional_cursor_->Pull(frame, symbol_table)) {
|
|
|
|
// if successful, next Pull from this should not pull_input_
|
|
|
|
pull_input_ = false;
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
// failed to Pull from the merge_match cursor
|
|
|
|
if (pull_input_) {
|
|
|
|
// if we have just now pulled from the input
|
|
|
|
// and failed to pull from optional_ so set the
|
|
|
|
// optional symbols to Null, ensure next time the
|
|
|
|
// input gets pulled and return true
|
|
|
|
for (const Symbol &sym : self_.optional_symbols_)
|
|
|
|
frame[sym] = TypedValue::Null;
|
|
|
|
pull_input_ = true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
// we have exhausted optional_cursor_ after 1 or more successful Pulls
|
|
|
|
// attempt next input_cursor_ pull
|
|
|
|
pull_input_ = true;
|
|
|
|
return Pull(frame, symbol_table);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Optional::OptionalCursor::Reset() {
|
|
|
|
input_cursor_->Reset();
|
|
|
|
optional_cursor_->Reset();
|
|
|
|
pull_input_ = true;
|
2017-04-25 21:22:21 +08:00
|
|
|
}
|
|
|
|
|
2017-04-26 22:12:39 +08:00
|
|
|
} // namespace query::plan
|