Use per_pull and whole execution allocators in Cursors

Reviewers: mtomic, mferencevic, msantl

Reviewed By: mtomic, msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2134
This commit is contained in:
Teon Banek 2019-06-11 09:56:46 +02:00
parent 1253319710
commit 6086257204
5 changed files with 229 additions and 152 deletions
src
tests/benchmark/query

View File

@ -134,6 +134,8 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info,
ExpressionEvaluator evaluator(frame, context.symbol_table,
context.evaluation_context, context.db_accessor,
GraphView::NEW);
// TODO: PropsSetChecked allocates a PropertyValue, make it use context.memory
// when we update PropertyValue with custom allocator.
for (auto &kv : node_info.properties)
PropsSetChecked(&new_node, kv.first, kv.second->Accept(evaluator));
(*frame)[node_info.symbol] = new_node;
@ -205,6 +207,20 @@ CreateExpand::CreateExpandCursor::CreateExpandCursor(
utils::MemoryResource *mem)
: self_(self), db_(*db), input_cursor_(self.input_->MakeCursor(db, mem)) {}
namespace {
void CreateEdge(const EdgeCreationInfo &edge_info,
database::GraphDbAccessor *dba, VertexAccessor *from,
VertexAccessor *to, Frame *frame,
ExpressionEvaluator *evaluator) {
EdgeAccessor edge = dba->InsertEdge(*from, *to, edge_info.edge_type);
for (auto kv : edge_info.properties)
PropsSetChecked(&edge, kv.first, kv.second->Accept(*evaluator));
(*frame)[edge_info.symbol] = edge;
}
} // namespace
bool CreateExpand::CreateExpandCursor::Pull(Frame &frame,
ExecutionContext &context) {
SCOPED_PROFILE_OP("CreateExpand");
@ -231,17 +247,17 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame,
// create an edge between the two nodes
switch (self_.edge_info_.direction) {
case EdgeAtom::Direction::IN:
CreateEdge(v2, v1, frame, context.symbol_table, evaluator);
CreateEdge(self_.edge_info_, &db_, &v2, &v1, &frame, &evaluator);
break;
case EdgeAtom::Direction::OUT:
CreateEdge(v1, v2, frame, context.symbol_table, evaluator);
CreateEdge(self_.edge_info_, &db_, &v1, &v2, &frame, &evaluator);
break;
case EdgeAtom::Direction::BOTH:
// in the case of an undirected CreateExpand we choose an arbitrary
// direction. this is used in the MERGE clause
// it is not allowed in the CREATE clause, and the semantic
// checker needs to ensure it doesn't reach this point
CreateEdge(v1, v2, frame, context.symbol_table, evaluator);
CreateEdge(self_.edge_info_, &db_, &v1, &v2, &frame, &evaluator);
}
return true;
@ -263,15 +279,6 @@ VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(
}
}
void CreateExpand::CreateExpandCursor::CreateEdge(
VertexAccessor &from, VertexAccessor &to, Frame &frame,
const SymbolTable &symbol_table, ExpressionEvaluator &evaluator) {
EdgeAccessor edge = db_.InsertEdge(from, to, self_.edge_info_.edge_type);
for (auto kv : self_.edge_info_.properties)
PropsSetChecked(&edge, kv.first, kv.second->Accept(evaluator));
frame[self_.edge_info_.symbol] = edge;
}
template <class TVerticesFun>
class ScanAllCursor : public Cursor {
public:
@ -661,11 +668,13 @@ namespace {
* @param vertex - The vertex to expand from.
* @param direction - Expansion direction. All directions (IN, OUT, BOTH)
* are supported.
* @param memory - Used to allocate the result.
* @return See above.
*/
auto ExpandFromVertex(const VertexAccessor &vertex,
EdgeAtom::Direction direction,
const std::vector<storage::EdgeType> &edge_types) {
const std::vector<storage::EdgeType> &edge_types,
utils::MemoryResource *memory) {
// wraps an EdgeAccessor into a pair <accessor, direction>
auto wrapper = [](EdgeAtom::Direction direction, auto &&vertices) {
return iter::imap(
@ -676,7 +685,8 @@ auto ExpandFromVertex(const VertexAccessor &vertex,
};
// prepare a vector of elements we'll pass to the itertools
std::vector<decltype(wrapper(direction, vertex.in()))> chain_elements;
utils::AVector<decltype(wrapper(direction, vertex.in()))> chain_elements(
memory);
if (direction != EdgeAtom::Direction::OUT && vertex.in_degree() > 0) {
auto edges = vertex.in(&edge_types);
@ -693,6 +703,7 @@ auto ExpandFromVertex(const VertexAccessor &vertex,
}
}
// TODO: Investigate whether itertools perform heap allocation?
return iter::chain.from_iterable(std::move(chain_elements));
}
@ -758,15 +769,13 @@ class ExpandVariableCursor : public Cursor {
// a stack of edge iterables corresponding to the level/depth of
// the expansion currently being Pulled
using ExpandEdges = decltype(ExpandFromVertex(std::declval<VertexAccessor>(),
EdgeAtom::Direction::IN,
self_.common_.edge_types));
std::vector<ExpandEdges, utils::Allocator<ExpandEdges>> edges_;
using ExpandEdges = decltype(
ExpandFromVertex(std::declval<VertexAccessor>(), EdgeAtom::Direction::IN,
self_.common_.edge_types, utils::NewDeleteResource()));
utils::AVector<ExpandEdges> edges_;
// an iterator indicating the position in the corresponding edges_ element
std::vector<decltype(edges_.begin()->begin()),
utils::Allocator<decltype(edges_.begin()->begin())>>
edges_it_;
utils::AVector<decltype(edges_.begin()->begin())> edges_it_;
/**
* Helper function that Pulls from the input vertex and
@ -776,8 +785,7 @@ class ExpandVariableCursor : public Cursor {
* is exhausted.
*/
bool PullInput(Frame &frame, ExecutionContext &context) {
// Input Vertex could be null if it is created by a failed optional
// match.
// Input Vertex could be null if it is created by a failed optional match.
// In those cases we skip that input pull and continue with the next.
while (true) {
if (context.db_accessor->should_abort()) throw HintedAbortError();
@ -809,13 +817,15 @@ class ExpandVariableCursor : public Cursor {
if (upper_bound_ > 0) {
SwitchAccessor(vertex, GraphView::OLD);
auto *memory = edges_.get_allocator().GetMemoryResource();
edges_.emplace_back(ExpandFromVertex(vertex, self_.common_.direction,
self_.common_.edge_types));
self_.common_.edge_types, memory));
edges_it_.emplace_back(edges_.back().begin());
}
// reset the frame value to an empty edge list
frame[self_.common_.edge_symbol] = std::vector<TypedValue>();
auto *pull_memory = context.evaluation_context.memory;
frame[self_.common_.edge_symbol] = TypedValue::TVector(pull_memory);
return true;
}
@ -926,8 +936,10 @@ class ExpandVariableCursor : public Cursor {
// edge's expansions onto the stack, if we should continue to expand
if (upper_bound_ > static_cast<int64_t>(edges_.size())) {
SwitchAccessor(current_vertex, GraphView::OLD);
edges_.emplace_back(ExpandFromVertex(
current_vertex, self_.common_.direction, self_.common_.edge_types));
auto *memory = edges_.get_allocator().GetMemoryResource();
edges_.emplace_back(ExpandFromVertex(current_vertex,
self_.common_.direction,
self_.common_.edge_types, memory));
edges_it_.emplace_back(edges_.back().begin());
}
@ -945,9 +957,7 @@ class STShortestPathCursor : public query::plan::Cursor {
STShortestPathCursor(const ExpandVariable &self,
database::GraphDbAccessor *dba,
utils::MemoryResource *mem)
: mem_(mem),
self_(self),
input_cursor_(self_.input()->MakeCursor(dba, mem)) {
: self_(self), input_cursor_(self_.input()->MakeCursor(dba, mem)) {
CHECK(self_.common_.existing_node)
<< "s-t shortest path algorithm should only "
"be used when `existing_node` flag is "
@ -961,15 +971,15 @@ class STShortestPathCursor : public query::plan::Cursor {
context.evaluation_context,
context.db_accessor, GraphView::OLD);
while (input_cursor_->Pull(frame, context)) {
auto source_tv = frame[self_.input_symbol_];
auto sink_tv = frame[self_.common_.node_symbol];
const auto &source_tv = frame[self_.input_symbol_];
const auto &sink_tv = frame[self_.common_.node_symbol];
// It is possible that source or sink vertex is Null due to optional
// matching.
if (source_tv.IsNull() || sink_tv.IsNull()) continue;
auto source = source_tv.ValueVertex();
auto sink = sink_tv.ValueVertex();
const auto &source = source_tv.ValueVertex();
const auto &sink = sink_tv.ValueVertex();
int64_t lower_bound =
self_.lower_bound_
@ -997,7 +1007,6 @@ class STShortestPathCursor : public query::plan::Cursor {
void Reset() override { input_cursor_->Reset(); }
private:
utils::MemoryResource *mem_;
const ExpandVariable &self_;
UniqueCursorPtr input_cursor_;
@ -1009,8 +1018,9 @@ class STShortestPathCursor : public query::plan::Cursor {
void ReconstructPath(const VertexAccessor &midpoint,
const VertexEdgeMapT &in_edge,
const VertexEdgeMapT &out_edge, Frame *frame) {
std::vector<TypedValue> result;
const VertexEdgeMapT &out_edge, Frame *frame,
utils::MemoryResource *pull_memory) {
utils::AVector<TypedValue> result(pull_memory);
auto last_vertex = midpoint;
while (true) {
const auto &last_edge = in_edge.at(last_vertex);
@ -1059,24 +1069,21 @@ class STShortestPathCursor : public query::plan::Cursor {
// perform better for real-world like graphs where the expansion front
// grows exponentially, effectively reducing the exponent by half.
auto *pull_memory = evaluator->GetMemoryResource();
// Holds vertices at the current level of expansion from the source
// (sink).
std::vector<VertexAccessor, utils::Allocator<VertexAccessor>>
source_frontier(mem_);
std::vector<VertexAccessor, utils::Allocator<VertexAccessor>> sink_frontier(
mem_);
utils::AVector<VertexAccessor> source_frontier(pull_memory);
utils::AVector<VertexAccessor> sink_frontier(pull_memory);
// Holds vertices we can expand to from `source_frontier`
// (`sink_frontier`).
std::vector<VertexAccessor, utils::Allocator<VertexAccessor>> source_next(
mem_);
std::vector<VertexAccessor, utils::Allocator<VertexAccessor>> sink_next(
mem_);
utils::AVector<VertexAccessor> source_next(pull_memory);
utils::AVector<VertexAccessor> sink_next(pull_memory);
// Maps each vertex we visited expanding from the source (sink) to the
// edge used. Necessary for path reconstruction.
VertexEdgeMapT in_edge(mem_);
VertexEdgeMapT out_edge(mem_);
VertexEdgeMapT in_edge(pull_memory);
VertexEdgeMapT out_edge(pull_memory);
size_t current_length = 0;
@ -1099,7 +1106,8 @@ class STShortestPathCursor : public query::plan::Cursor {
in_edge.emplace(edge.to(), edge);
if (Contains(out_edge, edge.to())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.to(), in_edge, out_edge, frame);
ReconstructPath(edge.to(), in_edge, out_edge, frame,
pull_memory);
return true;
} else {
return false;
@ -1116,7 +1124,8 @@ class STShortestPathCursor : public query::plan::Cursor {
in_edge.emplace(edge.from(), edge);
if (Contains(out_edge, edge.from())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.from(), in_edge, out_edge, frame);
ReconstructPath(edge.from(), in_edge, out_edge, frame,
pull_memory);
return true;
} else {
return false;
@ -1147,7 +1156,8 @@ class STShortestPathCursor : public query::plan::Cursor {
out_edge.emplace(edge.to(), edge);
if (Contains(in_edge, edge.to())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.to(), in_edge, out_edge, frame);
ReconstructPath(edge.to(), in_edge, out_edge, frame,
pull_memory);
return true;
} else {
return false;
@ -1164,7 +1174,8 @@ class STShortestPathCursor : public query::plan::Cursor {
out_edge.emplace(edge.from(), edge);
if (Contains(in_edge, edge.from())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.from(), in_edge, out_edge, frame);
ReconstructPath(edge.from(), in_edge, out_edge, frame,
pull_memory);
return true;
} else {
return false;
@ -1237,7 +1248,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
// populates the to_visit_next_ structure with expansions
// from the given vertex. skips expansions that don't satisfy
// the "where" condition.
auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex) {
auto expand_from_vertex = [this, &expand_pair](const auto &vertex) {
if (self_.common_.direction != EdgeAtom::Direction::IN) {
for (const EdgeAccessor &edge : vertex.out(&self_.common_.edge_types))
expand_pair(edge, edge.to());
@ -1263,7 +1274,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
to_visit_next_.clear();
processed_.clear();
auto vertex_value = frame[self_.input_symbol_];
const auto &vertex_value = frame[self_.input_symbol_];
// it is possible that the vertex is Null due to optional matching
if (vertex_value.IsNull()) continue;
lower_bound_ = self_.lower_bound_
@ -1277,7 +1288,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
if (upper_bound_ < 1 || lower_bound_ > upper_bound_) continue;
auto vertex = vertex_value.Value<VertexAccessor>();
const auto &vertex = vertex_value.Value<VertexAccessor>();
processed_.emplace(vertex, std::nullopt);
expand_from_vertex(vertex);
@ -1291,7 +1302,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
to_visit_current_.pop_back();
// create the frame value for the edges
std::vector<TypedValue> edge_list{TypedValue(expansion.first)};
auto *pull_memory = context.evaluation_context.memory;
utils::AVector<TypedValue> edge_list(pull_memory);
edge_list.emplace_back(expansion.first);
auto last_vertex = expansion.second;
while (true) {
const EdgeAccessor &last_edge = edge_list.back().Value<EdgeAccessor>();
@ -1347,12 +1360,8 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
std::optional<EdgeAccessor>>>>
processed_;
// edge/vertex pairs we have yet to visit, for current and next depth
std::vector<std::pair<EdgeAccessor, VertexAccessor>,
utils::Allocator<std::pair<EdgeAccessor, VertexAccessor>>>
to_visit_current_;
std::vector<std::pair<EdgeAccessor, VertexAccessor>,
utils::Allocator<std::pair<EdgeAccessor, VertexAccessor>>>
to_visit_next_;
utils::AVector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_current_;
utils::AVector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_;
};
class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
@ -1360,7 +1369,12 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
ExpandWeightedShortestPathCursor(const ExpandVariable &self,
database::GraphDbAccessor *db,
utils::MemoryResource *mem)
: self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {}
: self_(self),
input_cursor_(self_.input_->MakeCursor(db, mem)),
total_cost_(mem),
previous_(mem),
yielded_vertices_(mem),
pq_(mem) {}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("ExpandWeightedShortestPath");
@ -1381,6 +1395,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
SwitchAccessor(edge, GraphView::OLD);
SwitchAccessor(vertex, GraphView::OLD);
auto *memory = evaluator.GetMemoryResource();
if (self_.filter_lambda_.expression) {
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
frame[self_.filter_lambda_.inner_node_symbol] = vertex;
@ -1398,12 +1413,12 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
throw QueryRuntimeException(
"Calculated weight must be numeric, got {}.", typed_weight.type());
}
if ((typed_weight < TypedValue(0)).Value<bool>()) {
if ((typed_weight < TypedValue(0, memory)).Value<bool>()) {
throw QueryRuntimeException("Calculated weight must be non-negative!");
}
auto next_state = create_state(vertex, depth);
auto next_weight = TypedValue(weight) + typed_weight;
auto next_weight = TypedValue(weight, memory) + typed_weight;
auto found_it = total_cost_.find(next_state);
if (found_it != total_cost_.end() &&
found_it->second.Value<double>() <= next_weight.Value<double>())
@ -1415,7 +1430,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
// Populates the priority queue structure with expansions
// from the given vertex. skips expansions that don't satisfy
// the "where" condition.
auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex,
auto expand_from_vertex = [this, &expand_pair](const VertexAccessor &vertex,
double weight, int depth) {
if (self_.common_.direction != EdgeAtom::Direction::IN) {
for (const EdgeAccessor &edge : vertex.out(&self_.common_.edge_types)) {
@ -1433,11 +1448,11 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
if (context.db_accessor->should_abort()) throw HintedAbortError();
if (pq_.empty()) {
if (!input_cursor_->Pull(frame, context)) return false;
auto vertex_value = frame[self_.input_symbol_];
const auto &vertex_value = frame[self_.input_symbol_];
if (vertex_value.IsNull()) continue;
auto vertex = vertex_value.Value<VertexAccessor>();
if (self_.common_.existing_node) {
TypedValue &node = frame[self_.common_.node_symbol];
const auto &node = frame[self_.common_.node_symbol];
// Due to optional matching the existing node could be null.
// Skip expansion for such nodes.
if (node.IsNull()) continue;
@ -1499,7 +1514,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
// Reconstruct the path.
auto last_vertex = current_vertex;
auto last_depth = current_depth;
std::vector<TypedValue> edge_list{};
auto *pull_memory = context.evaluation_context.memory;
utils::AVector<TypedValue> edge_list(pull_memory);
while (true) {
// Origin_vertex must be in previous.
const auto &previous_edge =
@ -1514,8 +1530,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
// Place destination node on the frame, handle existence flag.
if (self_.common_.existing_node) {
TypedValue &node = frame[self_.common_.node_symbol];
if ((node != TypedValue(current_vertex)).Value<bool>())
const auto &node = frame[self_.common_.node_symbol];
if ((node != TypedValue(current_vertex, pull_memory)).Value<bool>())
continue;
else
// Prevent expanding other paths, because we found the
@ -1562,16 +1578,23 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
};
// Maps vertices to weights they got in expansion.
std::unordered_map<std::pair<VertexAccessor, int>, TypedValue, WspStateHash>
std::unordered_map<
std::pair<VertexAccessor, int>, TypedValue, WspStateHash, std::equal_to<>,
utils::Allocator<
std::pair<const std::pair<VertexAccessor, int>, TypedValue>>>
total_cost_;
// Maps vertices to edges used to reach them.
std::unordered_map<std::pair<VertexAccessor, int>,
std::optional<EdgeAccessor>, WspStateHash>
std::optional<EdgeAccessor>, WspStateHash, std::equal_to<>,
utils::Allocator<std::pair<
const std::pair<VertexAccessor, int>, TypedValue>>>
previous_;
// Keeps track of vertices for which we yielded a path already.
std::unordered_set<VertexAccessor> yielded_vertices_;
std::unordered_set<VertexAccessor, std::hash<VertexAccessor>, std::equal_to<>,
utils::Allocator<VertexAccessor>>
yielded_vertices_;
// Priority queue comparator. Keep lowest weight on top of the queue.
class PriorityQueueComparator {
@ -1586,7 +1609,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
std::priority_queue<
std::tuple<double, int, VertexAccessor, std::optional<EdgeAccessor>>,
std::vector<
utils::AVector<
std::tuple<double, int, VertexAccessor, std::optional<EdgeAccessor>>>,
PriorityQueueComparator>
pq_;
@ -1633,17 +1656,17 @@ class ConstructNamedPathCursor : public Cursor {
DCHECK(symbol_it != self_.path_elements_.end())
<< "Named path must contain at least one node";
TypedValue start_vertex = frame[*symbol_it++];
const auto &start_vertex = frame[*symbol_it++];
auto *pull_memory = context.evaluation_context.memory;
// In an OPTIONAL MATCH everything could be Null.
if (start_vertex.IsNull()) {
frame[self_.path_symbol_] = TypedValue();
frame[self_.path_symbol_] = TypedValue(pull_memory);
return true;
}
DCHECK(start_vertex.IsVertex())
<< "First named path element must be a vertex";
query::Path path(start_vertex.ValueVertex());
query::Path path(start_vertex.ValueVertex(), pull_memory);
// If the last path element symbol was for an edge list, then
// the next symbol is a vertex and it should not append to the path
@ -1652,12 +1675,12 @@ class ConstructNamedPathCursor : public Cursor {
bool last_was_edge_list = false;
for (; symbol_it != self_.path_elements_.end(); symbol_it++) {
TypedValue expansion = frame[*symbol_it];
const auto &expansion = frame[*symbol_it];
// We can have Null (OPTIONAL MATCH), a vertex, an edge, or an edge
// list (variable expand or BFS).
switch (expansion.type()) {
case TypedValue::Type::Null:
frame[self_.path_symbol_] = TypedValue();
frame[self_.path_symbol_] = TypedValue(pull_memory);
return true;
case TypedValue::Type::Vertex:
if (!last_was_edge_list) path.Expand(expansion.ValueVertex());
@ -1673,7 +1696,7 @@ class ConstructNamedPathCursor : public Cursor {
const auto &edges = expansion.ValueList();
for (const auto &edge_value : edges) {
const EdgeAccessor &edge = edge_value.ValueEdge();
const VertexAccessor from = edge.from();
const VertexAccessor &from = edge.from();
if (path.vertices().back() == from)
path.Expand(edge, edge.to());
else
@ -1835,10 +1858,11 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
ExpressionEvaluator evaluator(&frame, context.symbol_table,
context.evaluation_context, context.db_accessor,
GraphView::NEW);
auto *pull_memory = context.evaluation_context.memory;
// collect expressions results so edges can get deleted before vertices
// this is necessary because an edge that gets deleted could block vertex
// deletion
std::vector<TypedValue> expression_results;
utils::AVector<TypedValue> expression_results(pull_memory);
expression_results.reserve(self_.expressions_.size());
for (Expression *expression : self_.expressions_) {
expression_results.emplace_back(expression->Accept(evaluator));
@ -2260,9 +2284,9 @@ bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(
SCOPED_PROFILE_OP("EdgeUniquenessFilter");
auto expansion_ok = [&]() {
TypedValue &expand_value = frame[self_.expand_symbol_];
const auto &expand_value = frame[self_.expand_symbol_];
for (const auto &previous_symbol : self_.previous_symbols_) {
TypedValue &previous_value = frame[previous_symbol];
const auto &previous_value = frame[previous_symbol];
// This shouldn't raise a TypedValueException, because the planner
// makes sure these are all of the expected type. In case they are not
// an error should be raised long before this code is executed.
@ -2382,19 +2406,20 @@ namespace {
* This value is valid both for returning when where are no inputs
* to the aggregation op, and for initializing an aggregation result
* when there are */
TypedValue DefaultAggregationOpValue(const Aggregate::Element &element) {
TypedValue DefaultAggregationOpValue(const Aggregate::Element &element,
utils::MemoryResource *memory) {
switch (element.op) {
case Aggregation::Op::COUNT:
return TypedValue(0);
return TypedValue(0, memory);
case Aggregation::Op::SUM:
case Aggregation::Op::MIN:
case Aggregation::Op::MAX:
case Aggregation::Op::AVG:
return TypedValue();
return TypedValue(memory);
case Aggregation::Op::COLLECT_LIST:
return TypedValue(std::vector<TypedValue>());
return TypedValue(TypedValue::TVector(memory));
case Aggregation::Op::COLLECT_MAP:
return TypedValue(std::map<std::string, TypedValue>());
return TypedValue(TypedValue::TMap(memory));
}
}
} // namespace
@ -2418,12 +2443,13 @@ class AggregateCursor : public Cursor {
// in case there is no input and no group_bys we need to return true
// just this once
if (aggregation_.empty() && self_.group_by_.empty()) {
auto *pull_memory = context.evaluation_context.memory;
// place default aggregation values on the frame
for (const auto &elem : self_.aggregations_)
frame[elem.output_sym] = DefaultAggregationOpValue(elem);
frame[elem.output_sym] = DefaultAggregationOpValue(elem, pull_memory);
// place null as remember values on the frame
for (const Symbol &remember_sym : self_.remember_)
frame[remember_sym] = TypedValue();
frame[remember_sym] = TypedValue(pull_memory);
return true;
}
}
@ -2519,9 +2545,11 @@ class AggregateCursor : public Cursor {
for (auto &kv : aggregation_) {
AggregationValue &agg_value = kv.second;
int count = agg_value.counts_[pos];
auto *pull_memory = context->evaluation_context.memory;
if (count > 0)
agg_value.values_[pos] =
agg_value.values_[pos] / TypedValue(static_cast<double>(count));
agg_value.values_[pos] /
TypedValue(static_cast<double>(count), pull_memory);
}
}
}
@ -2550,8 +2578,10 @@ class AggregateCursor : public Cursor {
AggregateCursor::AggregationValue *agg_value) const {
if (!agg_value->values_.empty()) return;
for (const auto &agg_elem : self_.aggregations_)
agg_value->values_.emplace_back(DefaultAggregationOpValue(agg_elem));
for (const auto &agg_elem : self_.aggregations_) {
auto *mem = agg_value->values_.get_allocator().GetMemoryResource();
agg_value->values_.emplace_back(DefaultAggregationOpValue(agg_elem, mem));
}
agg_value->counts_.resize(self_.aggregations_.size(), 0);
for (const Symbol &remember_sym : self_.remember_)
@ -3091,7 +3121,7 @@ bool Optional::OptionalCursor::Pull(Frame &frame, ExecutionContext &context) {
// optional symbols to Null, ensure next time the
// input gets pulled and return true
for (const Symbol &sym : self_.optional_symbols_)
frame[sym] = TypedValue();
frame[sym] = TypedValue(context.evaluation_context.memory);
pull_input_ = true;
return true;
}
@ -3156,12 +3186,8 @@ class UnwindCursor : public Cursor {
throw QueryRuntimeException(
"Argument of UNWIND must be a list, but '{}' was provided.",
input_value.type());
// Copy the evaluted input_value_list to our vector. Since we use a
// different allocator, we cannot just do
// input_value_ = input_value.ValueList();
const auto &input_value_list = input_value.ValueList();
input_value_.reserve(input_value_list.size());
input_value_.assign(input_value_list.begin(), input_value_list.end());
// Copy the evaluted input_value_list to our vector.
input_value_ = input_value.ValueList();
input_value_it_ = input_value_.begin();
}
@ -3308,7 +3334,10 @@ Union::UnionCursor::UnionCursor(const Union &self,
bool Union::UnionCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("Union");
std::unordered_map<std::string, TypedValue> results;
std::unordered_map<std::string, TypedValue, std::hash<std::string>,
std::equal_to<>,
utils::Allocator<std::pair<const std::string, TypedValue>>>
results(context.evaluation_context.memory);
if (left_cursor_->Pull(frame, context)) {
// collect values from the left child
for (const auto &output_symbol : self_.left_symbols_) {
@ -3363,6 +3392,8 @@ class CartesianCursor : public Cursor {
CartesianCursor(const Cartesian &self, database::GraphDbAccessor *db,
utils::MemoryResource *mem)
: self_(self),
left_op_frames_(mem),
right_op_frame_(mem),
left_op_cursor_(self.left_op_->MakeCursor(db, mem)),
right_op_cursor_(self_.right_op_->MakeCursor(db, mem)) {
CHECK(left_op_cursor_ != nullptr)
@ -3377,7 +3408,8 @@ class CartesianCursor : public Cursor {
if (!cartesian_pull_initialized_) {
// Pull all left_op frames.
while (left_op_cursor_->Pull(frame, context)) {
left_op_frames_.emplace_back(frame.elems());
left_op_frames_.emplace_back(frame.elems().begin(),
frame.elems().end());
}
// We're setting the iterator to 'end' here so it pulls the right
@ -3391,8 +3423,8 @@ class CartesianCursor : public Cursor {
return false;
}
auto restore_frame = [&frame](const std::vector<Symbol> &symbols,
const std::vector<TypedValue> &restore_from) {
auto restore_frame = [&frame](const auto &symbols,
const auto &restore_from) {
for (const auto &symbol : symbols) {
frame[symbol] = restore_from[symbol.position()];
}
@ -3402,7 +3434,7 @@ class CartesianCursor : public Cursor {
// Advance right_op_cursor_.
if (!right_op_cursor_->Pull(frame, context)) return false;
right_op_frame_ = frame.elems();
right_op_frame_.assign(frame.elems().begin(), frame.elems().end());
left_op_frames_it_ = left_op_frames_.begin();
} else {
// Make sure right_op_cursor last pulled results are on frame.
@ -3432,11 +3464,11 @@ class CartesianCursor : public Cursor {
private:
const Cartesian &self_;
std::vector<std::vector<TypedValue>> left_op_frames_;
std::vector<TypedValue> right_op_frame_;
utils::AVector<utils::AVector<TypedValue>> left_op_frames_;
utils::AVector<TypedValue> right_op_frame_;
const UniqueCursorPtr left_op_cursor_;
const UniqueCursorPtr right_op_cursor_;
std::vector<std::vector<TypedValue>>::iterator left_op_frames_it_;
utils::AVector<utils::AVector<TypedValue>>::iterator left_op_frames_it_;
bool cartesian_pull_initialized_{false};
};

View File

@ -522,18 +522,6 @@ chained in cases when longer paths need creating.
// Get the existing node (if existing_node_ == true), or create a new node
VertexAccessor &OtherVertex(Frame &frame, ExecutionContext &context);
/**
* Helper function for creating an edge and adding it
* to the frame.
*
* @param from Origin vertex of the edge.
* @param to Destination vertex of the edge.
* @param evaluator Expression evaluator for property value eval.
*/
void CreateEdge(VertexAccessor &from, VertexAccessor &to, Frame &frame,
const SymbolTable &symbol_table,
ExpressionEvaluator &evaluator);
};
cpp<#)
(:serialize (:slk))

View File

@ -19,6 +19,7 @@ struct ProfilingStats {
unsigned long long num_cycles{0};
uint64_t key{0};
const char *name{nullptr};
// TODO: This should use the allocator for query execution
std::vector<ProfilingStats> children;
};

View File

@ -9,6 +9,7 @@
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
// Although <memory_resource> is in C++17, gcc libstdc++ still needs to
// implement it fully. It should be available in the next major release
// version, i.e. gcc 9.x.
@ -218,6 +219,9 @@ bool operator!=(const Allocator<T> &a, const Allocator<U> &b) {
return !(a == b);
}
template <class T>
using AVector = std::vector<T, Allocator<T>>;
/// Wraps std::pmr::memory_resource for use with out MemoryResource
class StdMemoryResource final : public MemoryResource {
public:

View File

@ -20,11 +20,15 @@ class MonotonicBufferResource final {
public:
utils::MemoryResource *get() { return &memory_; }
void Reset() { memory_.Release(); }
};
class NewDeleteResource final {
public:
utils::MemoryResource *get() { return utils::NewDeleteResource(); }
void Reset() {}
};
static void AddVertices(database::GraphDb *db, int vertex_count) {
@ -100,15 +104,15 @@ static void Distinct(benchmark::State &state) {
query::plan::MakeLogicalPlan(&context, parameters, false);
ResultStreamFaker<query::TypedValue> results;
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
TMemory memory;
auto cursor = plan_and_cost.first->MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context))
;
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
state.SetItemsProcessed(state.iterations());
}
@ -150,8 +154,9 @@ static void ExpandVariable(benchmark::State &state) {
MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table);
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
@ -159,8 +164,7 @@ static void ExpandVariable(benchmark::State &state) {
auto cursor = expand_variable.MakeCursor(&dba, memory.get());
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
frame[expand_variable.input_symbol_] = query::TypedValue(v);
while (cursor->Pull(frame, execution_context))
;
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
}
state.SetItemsProcessed(state.iterations());
@ -186,8 +190,9 @@ static void ExpandBfs(benchmark::State &state) {
MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table);
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
@ -195,8 +200,7 @@ static void ExpandBfs(benchmark::State &state) {
auto cursor = expand_variable.MakeCursor(&dba, memory.get());
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
frame[expand_variable.input_symbol_] = query::TypedValue(v);
while (cursor->Pull(frame, execution_context))
;
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
}
state.SetItemsProcessed(state.iterations());
@ -224,8 +228,9 @@ static void ExpandShortest(benchmark::State &state) {
auto dest_symbol = expand_variable.common_.node_symbol;
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
@ -235,8 +240,7 @@ static void ExpandShortest(benchmark::State &state) {
frame[expand_variable.input_symbol_] = query::TypedValue(v);
for (const auto &dest : dba.Vertices(false)) {
frame[dest_symbol] = query::TypedValue(dest);
while (cursor->Pull(frame, execution_context))
;
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
}
}
@ -251,6 +255,51 @@ BENCHMARK_TEMPLATE(ExpandShortest, MonotonicBufferResource)
->Range(512, 1U << 20U)
->Unit(benchmark::kMicrosecond);
template <class TMemory>
// NOLINTNEXTLINE(google-runtime-references)
static void ExpandWeightedShortest(benchmark::State &state) {
query::AstStorage ast;
query::Parameters parameters;
database::GraphDb db;
AddTree(&db, state.range(0));
query::SymbolTable symbol_table;
auto expand_variable = MakeExpandVariable(
query::EdgeAtom::Type::WEIGHTED_SHORTEST_PATH, &symbol_table);
expand_variable.common_.existing_node = true;
expand_variable.weight_lambda_ =
query::plan::ExpansionLambda{symbol_table.CreateSymbol("edge", false),
symbol_table.CreateSymbol("vertex", false),
ast.Create<query::PrimitiveLiteral>(1)};
auto dest_symbol = expand_variable.common_.node_symbol;
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
TMemory memory;
auto cursor = expand_variable.MakeCursor(&dba, memory.get());
for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) {
frame[expand_variable.input_symbol_] = query::TypedValue(v);
for (const auto &dest : dba.Vertices(false)) {
frame[dest_symbol] = query::TypedValue(dest);
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
}
}
state.SetItemsProcessed(state.iterations());
}
BENCHMARK_TEMPLATE(ExpandWeightedShortest, NewDeleteResource)
->Range(512, 1U << 20U)
->Unit(benchmark::kMicrosecond);
BENCHMARK_TEMPLATE(ExpandWeightedShortest, MonotonicBufferResource)
->Range(512, 1U << 20U)
->Unit(benchmark::kMicrosecond);
template <class TMemory>
// NOLINTNEXTLINE(google-runtime-references)
static void Accumulate(benchmark::State &state) {
@ -270,15 +319,15 @@ static void Accumulate(benchmark::State &state) {
/* advance_command= */ false);
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
TMemory memory;
auto cursor = accumulate.MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context))
;
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
state.SetItemsProcessed(state.iterations());
}
@ -319,16 +368,19 @@ static void Aggregate(benchmark::State &state) {
query::plan::Aggregate aggregate(scan_all, aggregations, group_by, symbols);
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
TMemory memory;
auto cursor = aggregate.MakeCursor(&dba, memory.get());
frame[symbols.front()] = query::TypedValue(0); // initial group_by value
while (cursor->Pull(frame, execution_context))
while (cursor->Pull(frame, execution_context)) {
frame[symbols.front()].ValueInt()++; // new group_by value
per_pull_memory.Reset();
}
}
state.SetItemsProcessed(state.iterations());
}
@ -366,15 +418,15 @@ static void OrderBy(benchmark::State &state) {
query::plan::OrderBy order_by(scan_all, sort_items, symbols);
auto dba = db.Access();
query::Frame frame(symbol_table.max_position());
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
TMemory memory;
auto cursor = order_by.MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context))
;
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
state.SetItemsProcessed(state.iterations());
}
@ -405,15 +457,15 @@ static void Unwind(benchmark::State &state) {
query::Frame frame(symbol_table.max_position());
frame[list_sym] =
query::TypedValue(std::vector<query::TypedValue>(state.range(1)));
// Nothing should be used from the EvaluationContext, so leave it empty.
query::EvaluationContext evaluation_context;
// We need to only set the memory for temporary (per pull) evaluations
TMemory per_pull_memory;
query::EvaluationContext evaluation_context{per_pull_memory.get()};
while (state.KeepRunning()) {
query::ExecutionContext execution_context{&dba, symbol_table,
evaluation_context};
TMemory memory;
auto cursor = unwind.MakeCursor(&dba, memory.get());
while (cursor->Pull(frame, execution_context))
;
while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset();
}
state.SetItemsProcessed(state.iterations());
}