From 6086257204036176a9521f7f2e051f15ac4d2b28 Mon Sep 17 00:00:00 2001 From: Teon Banek <teon.banek@memgraph.io> Date: Tue, 11 Jun 2019 09:56:46 +0200 Subject: [PATCH] Use per_pull and whole execution allocators in Cursors Reviewers: mtomic, mferencevic, msantl Reviewed By: mtomic, msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2134 --- src/query/plan/operator.cpp | 250 ++++++++++++++++------------ src/query/plan/operator.lcp | 12 -- src/query/plan/profile.hpp | 1 + src/utils/memory.hpp | 4 + tests/benchmark/query/execution.cpp | 114 +++++++++---- 5 files changed, 229 insertions(+), 152 deletions(-) diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index fdc9e17a9..80e98860a 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -134,6 +134,8 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, ExpressionEvaluator evaluator(frame, context.symbol_table, context.evaluation_context, context.db_accessor, GraphView::NEW); + // TODO: PropsSetChecked allocates a PropertyValue, make it use context.memory + // when we update PropertyValue with custom allocator. for (auto &kv : node_info.properties) PropsSetChecked(&new_node, kv.first, kv.second->Accept(evaluator)); (*frame)[node_info.symbol] = new_node; @@ -205,6 +207,20 @@ CreateExpand::CreateExpandCursor::CreateExpandCursor( utils::MemoryResource *mem) : self_(self), db_(*db), input_cursor_(self.input_->MakeCursor(db, mem)) {} +namespace { + +void CreateEdge(const EdgeCreationInfo &edge_info, + database::GraphDbAccessor *dba, VertexAccessor *from, + VertexAccessor *to, Frame *frame, + ExpressionEvaluator *evaluator) { + EdgeAccessor edge = dba->InsertEdge(*from, *to, edge_info.edge_type); + for (auto kv : edge_info.properties) + PropsSetChecked(&edge, kv.first, kv.second->Accept(*evaluator)); + (*frame)[edge_info.symbol] = edge; +} + +} // namespace + bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("CreateExpand"); @@ -231,17 +247,17 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, // create an edge between the two nodes switch (self_.edge_info_.direction) { case EdgeAtom::Direction::IN: - CreateEdge(v2, v1, frame, context.symbol_table, evaluator); + CreateEdge(self_.edge_info_, &db_, &v2, &v1, &frame, &evaluator); break; case EdgeAtom::Direction::OUT: - CreateEdge(v1, v2, frame, context.symbol_table, evaluator); + CreateEdge(self_.edge_info_, &db_, &v1, &v2, &frame, &evaluator); break; case EdgeAtom::Direction::BOTH: // in the case of an undirected CreateExpand we choose an arbitrary // direction. this is used in the MERGE clause // it is not allowed in the CREATE clause, and the semantic // checker needs to ensure it doesn't reach this point - CreateEdge(v1, v2, frame, context.symbol_table, evaluator); + CreateEdge(self_.edge_info_, &db_, &v1, &v2, &frame, &evaluator); } return true; @@ -263,15 +279,6 @@ VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex( } } -void CreateExpand::CreateExpandCursor::CreateEdge( - VertexAccessor &from, VertexAccessor &to, Frame &frame, - const SymbolTable &symbol_table, ExpressionEvaluator &evaluator) { - EdgeAccessor edge = db_.InsertEdge(from, to, self_.edge_info_.edge_type); - for (auto kv : self_.edge_info_.properties) - PropsSetChecked(&edge, kv.first, kv.second->Accept(evaluator)); - frame[self_.edge_info_.symbol] = edge; -} - template <class TVerticesFun> class ScanAllCursor : public Cursor { public: @@ -661,11 +668,13 @@ namespace { * @param vertex - The vertex to expand from. * @param direction - Expansion direction. All directions (IN, OUT, BOTH) * are supported. + * @param memory - Used to allocate the result. * @return See above. */ auto ExpandFromVertex(const VertexAccessor &vertex, EdgeAtom::Direction direction, - const std::vector<storage::EdgeType> &edge_types) { + const std::vector<storage::EdgeType> &edge_types, + utils::MemoryResource *memory) { // wraps an EdgeAccessor into a pair <accessor, direction> auto wrapper = [](EdgeAtom::Direction direction, auto &&vertices) { return iter::imap( @@ -676,7 +685,8 @@ auto ExpandFromVertex(const VertexAccessor &vertex, }; // prepare a vector of elements we'll pass to the itertools - std::vector<decltype(wrapper(direction, vertex.in()))> chain_elements; + utils::AVector<decltype(wrapper(direction, vertex.in()))> chain_elements( + memory); if (direction != EdgeAtom::Direction::OUT && vertex.in_degree() > 0) { auto edges = vertex.in(&edge_types); @@ -693,6 +703,7 @@ auto ExpandFromVertex(const VertexAccessor &vertex, } } + // TODO: Investigate whether itertools perform heap allocation? return iter::chain.from_iterable(std::move(chain_elements)); } @@ -758,15 +769,13 @@ class ExpandVariableCursor : public Cursor { // a stack of edge iterables corresponding to the level/depth of // the expansion currently being Pulled - using ExpandEdges = decltype(ExpandFromVertex(std::declval<VertexAccessor>(), - EdgeAtom::Direction::IN, - self_.common_.edge_types)); - std::vector<ExpandEdges, utils::Allocator<ExpandEdges>> edges_; + using ExpandEdges = decltype( + ExpandFromVertex(std::declval<VertexAccessor>(), EdgeAtom::Direction::IN, + self_.common_.edge_types, utils::NewDeleteResource())); + utils::AVector<ExpandEdges> edges_; // an iterator indicating the position in the corresponding edges_ element - std::vector<decltype(edges_.begin()->begin()), - utils::Allocator<decltype(edges_.begin()->begin())>> - edges_it_; + utils::AVector<decltype(edges_.begin()->begin())> edges_it_; /** * Helper function that Pulls from the input vertex and @@ -776,8 +785,7 @@ class ExpandVariableCursor : public Cursor { * is exhausted. */ bool PullInput(Frame &frame, ExecutionContext &context) { - // Input Vertex could be null if it is created by a failed optional - // match. + // Input Vertex could be null if it is created by a failed optional match. // In those cases we skip that input pull and continue with the next. while (true) { if (context.db_accessor->should_abort()) throw HintedAbortError(); @@ -809,13 +817,15 @@ class ExpandVariableCursor : public Cursor { if (upper_bound_ > 0) { SwitchAccessor(vertex, GraphView::OLD); + auto *memory = edges_.get_allocator().GetMemoryResource(); edges_.emplace_back(ExpandFromVertex(vertex, self_.common_.direction, - self_.common_.edge_types)); + self_.common_.edge_types, memory)); edges_it_.emplace_back(edges_.back().begin()); } // reset the frame value to an empty edge list - frame[self_.common_.edge_symbol] = std::vector<TypedValue>(); + auto *pull_memory = context.evaluation_context.memory; + frame[self_.common_.edge_symbol] = TypedValue::TVector(pull_memory); return true; } @@ -926,8 +936,10 @@ class ExpandVariableCursor : public Cursor { // edge's expansions onto the stack, if we should continue to expand if (upper_bound_ > static_cast<int64_t>(edges_.size())) { SwitchAccessor(current_vertex, GraphView::OLD); - edges_.emplace_back(ExpandFromVertex( - current_vertex, self_.common_.direction, self_.common_.edge_types)); + auto *memory = edges_.get_allocator().GetMemoryResource(); + edges_.emplace_back(ExpandFromVertex(current_vertex, + self_.common_.direction, + self_.common_.edge_types, memory)); edges_it_.emplace_back(edges_.back().begin()); } @@ -945,9 +957,7 @@ class STShortestPathCursor : public query::plan::Cursor { STShortestPathCursor(const ExpandVariable &self, database::GraphDbAccessor *dba, utils::MemoryResource *mem) - : mem_(mem), - self_(self), - input_cursor_(self_.input()->MakeCursor(dba, mem)) { + : self_(self), input_cursor_(self_.input()->MakeCursor(dba, mem)) { CHECK(self_.common_.existing_node) << "s-t shortest path algorithm should only " "be used when `existing_node` flag is " @@ -961,15 +971,15 @@ class STShortestPathCursor : public query::plan::Cursor { context.evaluation_context, context.db_accessor, GraphView::OLD); while (input_cursor_->Pull(frame, context)) { - auto source_tv = frame[self_.input_symbol_]; - auto sink_tv = frame[self_.common_.node_symbol]; + const auto &source_tv = frame[self_.input_symbol_]; + const auto &sink_tv = frame[self_.common_.node_symbol]; // It is possible that source or sink vertex is Null due to optional // matching. if (source_tv.IsNull() || sink_tv.IsNull()) continue; - auto source = source_tv.ValueVertex(); - auto sink = sink_tv.ValueVertex(); + const auto &source = source_tv.ValueVertex(); + const auto &sink = sink_tv.ValueVertex(); int64_t lower_bound = self_.lower_bound_ @@ -997,7 +1007,6 @@ class STShortestPathCursor : public query::plan::Cursor { void Reset() override { input_cursor_->Reset(); } private: - utils::MemoryResource *mem_; const ExpandVariable &self_; UniqueCursorPtr input_cursor_; @@ -1009,8 +1018,9 @@ class STShortestPathCursor : public query::plan::Cursor { void ReconstructPath(const VertexAccessor &midpoint, const VertexEdgeMapT &in_edge, - const VertexEdgeMapT &out_edge, Frame *frame) { - std::vector<TypedValue> result; + const VertexEdgeMapT &out_edge, Frame *frame, + utils::MemoryResource *pull_memory) { + utils::AVector<TypedValue> result(pull_memory); auto last_vertex = midpoint; while (true) { const auto &last_edge = in_edge.at(last_vertex); @@ -1059,24 +1069,21 @@ class STShortestPathCursor : public query::plan::Cursor { // perform better for real-world like graphs where the expansion front // grows exponentially, effectively reducing the exponent by half. + auto *pull_memory = evaluator->GetMemoryResource(); // Holds vertices at the current level of expansion from the source // (sink). - std::vector<VertexAccessor, utils::Allocator<VertexAccessor>> - source_frontier(mem_); - std::vector<VertexAccessor, utils::Allocator<VertexAccessor>> sink_frontier( - mem_); + utils::AVector<VertexAccessor> source_frontier(pull_memory); + utils::AVector<VertexAccessor> sink_frontier(pull_memory); // Holds vertices we can expand to from `source_frontier` // (`sink_frontier`). - std::vector<VertexAccessor, utils::Allocator<VertexAccessor>> source_next( - mem_); - std::vector<VertexAccessor, utils::Allocator<VertexAccessor>> sink_next( - mem_); + utils::AVector<VertexAccessor> source_next(pull_memory); + utils::AVector<VertexAccessor> sink_next(pull_memory); // Maps each vertex we visited expanding from the source (sink) to the // edge used. Necessary for path reconstruction. - VertexEdgeMapT in_edge(mem_); - VertexEdgeMapT out_edge(mem_); + VertexEdgeMapT in_edge(pull_memory); + VertexEdgeMapT out_edge(pull_memory); size_t current_length = 0; @@ -1099,7 +1106,8 @@ class STShortestPathCursor : public query::plan::Cursor { in_edge.emplace(edge.to(), edge); if (Contains(out_edge, edge.to())) { if (current_length >= lower_bound) { - ReconstructPath(edge.to(), in_edge, out_edge, frame); + ReconstructPath(edge.to(), in_edge, out_edge, frame, + pull_memory); return true; } else { return false; @@ -1116,7 +1124,8 @@ class STShortestPathCursor : public query::plan::Cursor { in_edge.emplace(edge.from(), edge); if (Contains(out_edge, edge.from())) { if (current_length >= lower_bound) { - ReconstructPath(edge.from(), in_edge, out_edge, frame); + ReconstructPath(edge.from(), in_edge, out_edge, frame, + pull_memory); return true; } else { return false; @@ -1147,7 +1156,8 @@ class STShortestPathCursor : public query::plan::Cursor { out_edge.emplace(edge.to(), edge); if (Contains(in_edge, edge.to())) { if (current_length >= lower_bound) { - ReconstructPath(edge.to(), in_edge, out_edge, frame); + ReconstructPath(edge.to(), in_edge, out_edge, frame, + pull_memory); return true; } else { return false; @@ -1164,7 +1174,8 @@ class STShortestPathCursor : public query::plan::Cursor { out_edge.emplace(edge.from(), edge); if (Contains(in_edge, edge.from())) { if (current_length >= lower_bound) { - ReconstructPath(edge.from(), in_edge, out_edge, frame); + ReconstructPath(edge.from(), in_edge, out_edge, frame, + pull_memory); return true; } else { return false; @@ -1237,7 +1248,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { // populates the to_visit_next_ structure with expansions // from the given vertex. skips expansions that don't satisfy // the "where" condition. - auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex) { + auto expand_from_vertex = [this, &expand_pair](const auto &vertex) { if (self_.common_.direction != EdgeAtom::Direction::IN) { for (const EdgeAccessor &edge : vertex.out(&self_.common_.edge_types)) expand_pair(edge, edge.to()); @@ -1263,7 +1274,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { to_visit_next_.clear(); processed_.clear(); - auto vertex_value = frame[self_.input_symbol_]; + const auto &vertex_value = frame[self_.input_symbol_]; // it is possible that the vertex is Null due to optional matching if (vertex_value.IsNull()) continue; lower_bound_ = self_.lower_bound_ @@ -1277,7 +1288,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { if (upper_bound_ < 1 || lower_bound_ > upper_bound_) continue; - auto vertex = vertex_value.Value<VertexAccessor>(); + const auto &vertex = vertex_value.Value<VertexAccessor>(); processed_.emplace(vertex, std::nullopt); expand_from_vertex(vertex); @@ -1291,7 +1302,9 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { to_visit_current_.pop_back(); // create the frame value for the edges - std::vector<TypedValue> edge_list{TypedValue(expansion.first)}; + auto *pull_memory = context.evaluation_context.memory; + utils::AVector<TypedValue> edge_list(pull_memory); + edge_list.emplace_back(expansion.first); auto last_vertex = expansion.second; while (true) { const EdgeAccessor &last_edge = edge_list.back().Value<EdgeAccessor>(); @@ -1347,12 +1360,8 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { std::optional<EdgeAccessor>>>> processed_; // edge/vertex pairs we have yet to visit, for current and next depth - std::vector<std::pair<EdgeAccessor, VertexAccessor>, - utils::Allocator<std::pair<EdgeAccessor, VertexAccessor>>> - to_visit_current_; - std::vector<std::pair<EdgeAccessor, VertexAccessor>, - utils::Allocator<std::pair<EdgeAccessor, VertexAccessor>>> - to_visit_next_; + utils::AVector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_current_; + utils::AVector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_; }; class ExpandWeightedShortestPathCursor : public query::plan::Cursor { @@ -1360,7 +1369,12 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { ExpandWeightedShortestPathCursor(const ExpandVariable &self, database::GraphDbAccessor *db, utils::MemoryResource *mem) - : self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {} + : self_(self), + input_cursor_(self_.input_->MakeCursor(db, mem)), + total_cost_(mem), + previous_(mem), + yielded_vertices_(mem), + pq_(mem) {} bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("ExpandWeightedShortestPath"); @@ -1381,6 +1395,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { SwitchAccessor(edge, GraphView::OLD); SwitchAccessor(vertex, GraphView::OLD); + auto *memory = evaluator.GetMemoryResource(); if (self_.filter_lambda_.expression) { frame[self_.filter_lambda_.inner_edge_symbol] = edge; frame[self_.filter_lambda_.inner_node_symbol] = vertex; @@ -1398,12 +1413,12 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { throw QueryRuntimeException( "Calculated weight must be numeric, got {}.", typed_weight.type()); } - if ((typed_weight < TypedValue(0)).Value<bool>()) { + if ((typed_weight < TypedValue(0, memory)).Value<bool>()) { throw QueryRuntimeException("Calculated weight must be non-negative!"); } auto next_state = create_state(vertex, depth); - auto next_weight = TypedValue(weight) + typed_weight; + auto next_weight = TypedValue(weight, memory) + typed_weight; auto found_it = total_cost_.find(next_state); if (found_it != total_cost_.end() && found_it->second.Value<double>() <= next_weight.Value<double>()) @@ -1415,7 +1430,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { // Populates the priority queue structure with expansions // from the given vertex. skips expansions that don't satisfy // the "where" condition. - auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex, + auto expand_from_vertex = [this, &expand_pair](const VertexAccessor &vertex, double weight, int depth) { if (self_.common_.direction != EdgeAtom::Direction::IN) { for (const EdgeAccessor &edge : vertex.out(&self_.common_.edge_types)) { @@ -1433,11 +1448,11 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { if (context.db_accessor->should_abort()) throw HintedAbortError(); if (pq_.empty()) { if (!input_cursor_->Pull(frame, context)) return false; - auto vertex_value = frame[self_.input_symbol_]; + const auto &vertex_value = frame[self_.input_symbol_]; if (vertex_value.IsNull()) continue; auto vertex = vertex_value.Value<VertexAccessor>(); if (self_.common_.existing_node) { - TypedValue &node = frame[self_.common_.node_symbol]; + const auto &node = frame[self_.common_.node_symbol]; // Due to optional matching the existing node could be null. // Skip expansion for such nodes. if (node.IsNull()) continue; @@ -1499,7 +1514,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { // Reconstruct the path. auto last_vertex = current_vertex; auto last_depth = current_depth; - std::vector<TypedValue> edge_list{}; + auto *pull_memory = context.evaluation_context.memory; + utils::AVector<TypedValue> edge_list(pull_memory); while (true) { // Origin_vertex must be in previous. const auto &previous_edge = @@ -1514,8 +1530,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { // Place destination node on the frame, handle existence flag. if (self_.common_.existing_node) { - TypedValue &node = frame[self_.common_.node_symbol]; - if ((node != TypedValue(current_vertex)).Value<bool>()) + const auto &node = frame[self_.common_.node_symbol]; + if ((node != TypedValue(current_vertex, pull_memory)).Value<bool>()) continue; else // Prevent expanding other paths, because we found the @@ -1562,16 +1578,23 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { }; // Maps vertices to weights they got in expansion. - std::unordered_map<std::pair<VertexAccessor, int>, TypedValue, WspStateHash> + std::unordered_map< + std::pair<VertexAccessor, int>, TypedValue, WspStateHash, std::equal_to<>, + utils::Allocator< + std::pair<const std::pair<VertexAccessor, int>, TypedValue>>> total_cost_; // Maps vertices to edges used to reach them. std::unordered_map<std::pair<VertexAccessor, int>, - std::optional<EdgeAccessor>, WspStateHash> + std::optional<EdgeAccessor>, WspStateHash, std::equal_to<>, + utils::Allocator<std::pair< + const std::pair<VertexAccessor, int>, TypedValue>>> previous_; // Keeps track of vertices for which we yielded a path already. - std::unordered_set<VertexAccessor> yielded_vertices_; + std::unordered_set<VertexAccessor, std::hash<VertexAccessor>, std::equal_to<>, + utils::Allocator<VertexAccessor>> + yielded_vertices_; // Priority queue comparator. Keep lowest weight on top of the queue. class PriorityQueueComparator { @@ -1586,7 +1609,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { std::priority_queue< std::tuple<double, int, VertexAccessor, std::optional<EdgeAccessor>>, - std::vector< + utils::AVector< std::tuple<double, int, VertexAccessor, std::optional<EdgeAccessor>>>, PriorityQueueComparator> pq_; @@ -1633,17 +1656,17 @@ class ConstructNamedPathCursor : public Cursor { DCHECK(symbol_it != self_.path_elements_.end()) << "Named path must contain at least one node"; - TypedValue start_vertex = frame[*symbol_it++]; - + const auto &start_vertex = frame[*symbol_it++]; + auto *pull_memory = context.evaluation_context.memory; // In an OPTIONAL MATCH everything could be Null. if (start_vertex.IsNull()) { - frame[self_.path_symbol_] = TypedValue(); + frame[self_.path_symbol_] = TypedValue(pull_memory); return true; } DCHECK(start_vertex.IsVertex()) << "First named path element must be a vertex"; - query::Path path(start_vertex.ValueVertex()); + query::Path path(start_vertex.ValueVertex(), pull_memory); // If the last path element symbol was for an edge list, then // the next symbol is a vertex and it should not append to the path @@ -1652,12 +1675,12 @@ class ConstructNamedPathCursor : public Cursor { bool last_was_edge_list = false; for (; symbol_it != self_.path_elements_.end(); symbol_it++) { - TypedValue expansion = frame[*symbol_it]; + const auto &expansion = frame[*symbol_it]; // We can have Null (OPTIONAL MATCH), a vertex, an edge, or an edge // list (variable expand or BFS). switch (expansion.type()) { case TypedValue::Type::Null: - frame[self_.path_symbol_] = TypedValue(); + frame[self_.path_symbol_] = TypedValue(pull_memory); return true; case TypedValue::Type::Vertex: if (!last_was_edge_list) path.Expand(expansion.ValueVertex()); @@ -1673,7 +1696,7 @@ class ConstructNamedPathCursor : public Cursor { const auto &edges = expansion.ValueList(); for (const auto &edge_value : edges) { const EdgeAccessor &edge = edge_value.ValueEdge(); - const VertexAccessor from = edge.from(); + const VertexAccessor &from = edge.from(); if (path.vertices().back() == from) path.Expand(edge, edge.to()); else @@ -1835,10 +1858,11 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, GraphView::NEW); + auto *pull_memory = context.evaluation_context.memory; // collect expressions results so edges can get deleted before vertices // this is necessary because an edge that gets deleted could block vertex // deletion - std::vector<TypedValue> expression_results; + utils::AVector<TypedValue> expression_results(pull_memory); expression_results.reserve(self_.expressions_.size()); for (Expression *expression : self_.expressions_) { expression_results.emplace_back(expression->Accept(evaluator)); @@ -2260,9 +2284,9 @@ bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull( SCOPED_PROFILE_OP("EdgeUniquenessFilter"); auto expansion_ok = [&]() { - TypedValue &expand_value = frame[self_.expand_symbol_]; + const auto &expand_value = frame[self_.expand_symbol_]; for (const auto &previous_symbol : self_.previous_symbols_) { - TypedValue &previous_value = frame[previous_symbol]; + const auto &previous_value = frame[previous_symbol]; // This shouldn't raise a TypedValueException, because the planner // makes sure these are all of the expected type. In case they are not // an error should be raised long before this code is executed. @@ -2382,19 +2406,20 @@ namespace { * This value is valid both for returning when where are no inputs * to the aggregation op, and for initializing an aggregation result * when there are */ -TypedValue DefaultAggregationOpValue(const Aggregate::Element &element) { +TypedValue DefaultAggregationOpValue(const Aggregate::Element &element, + utils::MemoryResource *memory) { switch (element.op) { case Aggregation::Op::COUNT: - return TypedValue(0); + return TypedValue(0, memory); case Aggregation::Op::SUM: case Aggregation::Op::MIN: case Aggregation::Op::MAX: case Aggregation::Op::AVG: - return TypedValue(); + return TypedValue(memory); case Aggregation::Op::COLLECT_LIST: - return TypedValue(std::vector<TypedValue>()); + return TypedValue(TypedValue::TVector(memory)); case Aggregation::Op::COLLECT_MAP: - return TypedValue(std::map<std::string, TypedValue>()); + return TypedValue(TypedValue::TMap(memory)); } } } // namespace @@ -2418,12 +2443,13 @@ class AggregateCursor : public Cursor { // in case there is no input and no group_bys we need to return true // just this once if (aggregation_.empty() && self_.group_by_.empty()) { + auto *pull_memory = context.evaluation_context.memory; // place default aggregation values on the frame for (const auto &elem : self_.aggregations_) - frame[elem.output_sym] = DefaultAggregationOpValue(elem); + frame[elem.output_sym] = DefaultAggregationOpValue(elem, pull_memory); // place null as remember values on the frame for (const Symbol &remember_sym : self_.remember_) - frame[remember_sym] = TypedValue(); + frame[remember_sym] = TypedValue(pull_memory); return true; } } @@ -2519,9 +2545,11 @@ class AggregateCursor : public Cursor { for (auto &kv : aggregation_) { AggregationValue &agg_value = kv.second; int count = agg_value.counts_[pos]; + auto *pull_memory = context->evaluation_context.memory; if (count > 0) agg_value.values_[pos] = - agg_value.values_[pos] / TypedValue(static_cast<double>(count)); + agg_value.values_[pos] / + TypedValue(static_cast<double>(count), pull_memory); } } } @@ -2550,8 +2578,10 @@ class AggregateCursor : public Cursor { AggregateCursor::AggregationValue *agg_value) const { if (!agg_value->values_.empty()) return; - for (const auto &agg_elem : self_.aggregations_) - agg_value->values_.emplace_back(DefaultAggregationOpValue(agg_elem)); + for (const auto &agg_elem : self_.aggregations_) { + auto *mem = agg_value->values_.get_allocator().GetMemoryResource(); + agg_value->values_.emplace_back(DefaultAggregationOpValue(agg_elem, mem)); + } agg_value->counts_.resize(self_.aggregations_.size(), 0); for (const Symbol &remember_sym : self_.remember_) @@ -3091,7 +3121,7 @@ bool Optional::OptionalCursor::Pull(Frame &frame, ExecutionContext &context) { // optional symbols to Null, ensure next time the // input gets pulled and return true for (const Symbol &sym : self_.optional_symbols_) - frame[sym] = TypedValue(); + frame[sym] = TypedValue(context.evaluation_context.memory); pull_input_ = true; return true; } @@ -3156,12 +3186,8 @@ class UnwindCursor : public Cursor { throw QueryRuntimeException( "Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); - // Copy the evaluted input_value_list to our vector. Since we use a - // different allocator, we cannot just do - // input_value_ = input_value.ValueList(); - const auto &input_value_list = input_value.ValueList(); - input_value_.reserve(input_value_list.size()); - input_value_.assign(input_value_list.begin(), input_value_list.end()); + // Copy the evaluted input_value_list to our vector. + input_value_ = input_value.ValueList(); input_value_it_ = input_value_.begin(); } @@ -3308,7 +3334,10 @@ Union::UnionCursor::UnionCursor(const Union &self, bool Union::UnionCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("Union"); - std::unordered_map<std::string, TypedValue> results; + std::unordered_map<std::string, TypedValue, std::hash<std::string>, + std::equal_to<>, + utils::Allocator<std::pair<const std::string, TypedValue>>> + results(context.evaluation_context.memory); if (left_cursor_->Pull(frame, context)) { // collect values from the left child for (const auto &output_symbol : self_.left_symbols_) { @@ -3363,6 +3392,8 @@ class CartesianCursor : public Cursor { CartesianCursor(const Cartesian &self, database::GraphDbAccessor *db, utils::MemoryResource *mem) : self_(self), + left_op_frames_(mem), + right_op_frame_(mem), left_op_cursor_(self.left_op_->MakeCursor(db, mem)), right_op_cursor_(self_.right_op_->MakeCursor(db, mem)) { CHECK(left_op_cursor_ != nullptr) @@ -3377,7 +3408,8 @@ class CartesianCursor : public Cursor { if (!cartesian_pull_initialized_) { // Pull all left_op frames. while (left_op_cursor_->Pull(frame, context)) { - left_op_frames_.emplace_back(frame.elems()); + left_op_frames_.emplace_back(frame.elems().begin(), + frame.elems().end()); } // We're setting the iterator to 'end' here so it pulls the right @@ -3391,8 +3423,8 @@ class CartesianCursor : public Cursor { return false; } - auto restore_frame = [&frame](const std::vector<Symbol> &symbols, - const std::vector<TypedValue> &restore_from) { + auto restore_frame = [&frame](const auto &symbols, + const auto &restore_from) { for (const auto &symbol : symbols) { frame[symbol] = restore_from[symbol.position()]; } @@ -3402,7 +3434,7 @@ class CartesianCursor : public Cursor { // Advance right_op_cursor_. if (!right_op_cursor_->Pull(frame, context)) return false; - right_op_frame_ = frame.elems(); + right_op_frame_.assign(frame.elems().begin(), frame.elems().end()); left_op_frames_it_ = left_op_frames_.begin(); } else { // Make sure right_op_cursor last pulled results are on frame. @@ -3432,11 +3464,11 @@ class CartesianCursor : public Cursor { private: const Cartesian &self_; - std::vector<std::vector<TypedValue>> left_op_frames_; - std::vector<TypedValue> right_op_frame_; + utils::AVector<utils::AVector<TypedValue>> left_op_frames_; + utils::AVector<TypedValue> right_op_frame_; const UniqueCursorPtr left_op_cursor_; const UniqueCursorPtr right_op_cursor_; - std::vector<std::vector<TypedValue>>::iterator left_op_frames_it_; + utils::AVector<utils::AVector<TypedValue>>::iterator left_op_frames_it_; bool cartesian_pull_initialized_{false}; }; diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index 090de7211..75eba007d 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -522,18 +522,6 @@ chained in cases when longer paths need creating. // Get the existing node (if existing_node_ == true), or create a new node VertexAccessor &OtherVertex(Frame &frame, ExecutionContext &context); - - /** - * Helper function for creating an edge and adding it - * to the frame. - * - * @param from Origin vertex of the edge. - * @param to Destination vertex of the edge. - * @param evaluator Expression evaluator for property value eval. - */ - void CreateEdge(VertexAccessor &from, VertexAccessor &to, Frame &frame, - const SymbolTable &symbol_table, - ExpressionEvaluator &evaluator); }; cpp<#) (:serialize (:slk)) diff --git a/src/query/plan/profile.hpp b/src/query/plan/profile.hpp index f9c482472..c405e3f56 100644 --- a/src/query/plan/profile.hpp +++ b/src/query/plan/profile.hpp @@ -19,6 +19,7 @@ struct ProfilingStats { unsigned long long num_cycles{0}; uint64_t key{0}; const char *name{nullptr}; + // TODO: This should use the allocator for query execution std::vector<ProfilingStats> children; }; diff --git a/src/utils/memory.hpp b/src/utils/memory.hpp index 96c76f327..73f79edb6 100644 --- a/src/utils/memory.hpp +++ b/src/utils/memory.hpp @@ -9,6 +9,7 @@ #include <tuple> #include <type_traits> #include <utility> +#include <vector> // Although <memory_resource> is in C++17, gcc libstdc++ still needs to // implement it fully. It should be available in the next major release // version, i.e. gcc 9.x. @@ -218,6 +219,9 @@ bool operator!=(const Allocator<T> &a, const Allocator<U> &b) { return !(a == b); } +template <class T> +using AVector = std::vector<T, Allocator<T>>; + /// Wraps std::pmr::memory_resource for use with out MemoryResource class StdMemoryResource final : public MemoryResource { public: diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index 442ae04c1..5661edaab 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -20,11 +20,15 @@ class MonotonicBufferResource final { public: utils::MemoryResource *get() { return &memory_; } + + void Reset() { memory_.Release(); } }; class NewDeleteResource final { public: utils::MemoryResource *get() { return utils::NewDeleteResource(); } + + void Reset() {} }; static void AddVertices(database::GraphDb *db, int vertex_count) { @@ -100,15 +104,15 @@ static void Distinct(benchmark::State &state) { query::plan::MakeLogicalPlan(&context, parameters, false); ResultStreamFaker<query::TypedValue> results; query::Frame frame(symbol_table.max_position()); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; auto cursor = plan_and_cost.first->MakeCursor(&dba, memory.get()); - while (cursor->Pull(frame, execution_context)) - ; + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); } @@ -150,8 +154,9 @@ static void ExpandVariable(benchmark::State &state) { MakeExpandVariable(query::EdgeAtom::Type::DEPTH_FIRST, &symbol_table); auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; @@ -159,8 +164,7 @@ static void ExpandVariable(benchmark::State &state) { auto cursor = expand_variable.MakeCursor(&dba, memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); - while (cursor->Pull(frame, execution_context)) - ; + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } } state.SetItemsProcessed(state.iterations()); @@ -186,8 +190,9 @@ static void ExpandBfs(benchmark::State &state) { MakeExpandVariable(query::EdgeAtom::Type::BREADTH_FIRST, &symbol_table); auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; @@ -195,8 +200,7 @@ static void ExpandBfs(benchmark::State &state) { auto cursor = expand_variable.MakeCursor(&dba, memory.get()); for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { frame[expand_variable.input_symbol_] = query::TypedValue(v); - while (cursor->Pull(frame, execution_context)) - ; + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } } state.SetItemsProcessed(state.iterations()); @@ -224,8 +228,9 @@ static void ExpandShortest(benchmark::State &state) { auto dest_symbol = expand_variable.common_.node_symbol; auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; @@ -235,8 +240,7 @@ static void ExpandShortest(benchmark::State &state) { frame[expand_variable.input_symbol_] = query::TypedValue(v); for (const auto &dest : dba.Vertices(false)) { frame[dest_symbol] = query::TypedValue(dest); - while (cursor->Pull(frame, execution_context)) - ; + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } } } @@ -251,6 +255,51 @@ BENCHMARK_TEMPLATE(ExpandShortest, MonotonicBufferResource) ->Range(512, 1U << 20U) ->Unit(benchmark::kMicrosecond); +template <class TMemory> +// NOLINTNEXTLINE(google-runtime-references) +static void ExpandWeightedShortest(benchmark::State &state) { + query::AstStorage ast; + query::Parameters parameters; + database::GraphDb db; + AddTree(&db, state.range(0)); + query::SymbolTable symbol_table; + auto expand_variable = MakeExpandVariable( + query::EdgeAtom::Type::WEIGHTED_SHORTEST_PATH, &symbol_table); + expand_variable.common_.existing_node = true; + expand_variable.weight_lambda_ = + query::plan::ExpansionLambda{symbol_table.CreateSymbol("edge", false), + symbol_table.CreateSymbol("vertex", false), + ast.Create<query::PrimitiveLiteral>(1)}; + auto dest_symbol = expand_variable.common_.node_symbol; + auto dba = db.Access(); + query::Frame frame(symbol_table.max_position()); + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; + while (state.KeepRunning()) { + query::ExecutionContext execution_context{&dba, symbol_table, + evaluation_context}; + TMemory memory; + auto cursor = expand_variable.MakeCursor(&dba, memory.get()); + for (const auto &v : dba.Vertices(dba.Label(kStartLabel), false)) { + frame[expand_variable.input_symbol_] = query::TypedValue(v); + for (const auto &dest : dba.Vertices(false)) { + frame[dest_symbol] = query::TypedValue(dest); + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); + } + } + } + state.SetItemsProcessed(state.iterations()); +} + +BENCHMARK_TEMPLATE(ExpandWeightedShortest, NewDeleteResource) + ->Range(512, 1U << 20U) + ->Unit(benchmark::kMicrosecond); + +BENCHMARK_TEMPLATE(ExpandWeightedShortest, MonotonicBufferResource) + ->Range(512, 1U << 20U) + ->Unit(benchmark::kMicrosecond); + template <class TMemory> // NOLINTNEXTLINE(google-runtime-references) static void Accumulate(benchmark::State &state) { @@ -270,15 +319,15 @@ static void Accumulate(benchmark::State &state) { /* advance_command= */ false); auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; auto cursor = accumulate.MakeCursor(&dba, memory.get()); - while (cursor->Pull(frame, execution_context)) - ; + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); } @@ -319,16 +368,19 @@ static void Aggregate(benchmark::State &state) { query::plan::Aggregate aggregate(scan_all, aggregations, group_by, symbols); auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; auto cursor = aggregate.MakeCursor(&dba, memory.get()); frame[symbols.front()] = query::TypedValue(0); // initial group_by value - while (cursor->Pull(frame, execution_context)) + while (cursor->Pull(frame, execution_context)) { frame[symbols.front()].ValueInt()++; // new group_by value + per_pull_memory.Reset(); + } } state.SetItemsProcessed(state.iterations()); } @@ -366,15 +418,15 @@ static void OrderBy(benchmark::State &state) { query::plan::OrderBy order_by(scan_all, sort_items, symbols); auto dba = db.Access(); query::Frame frame(symbol_table.max_position()); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; auto cursor = order_by.MakeCursor(&dba, memory.get()); - while (cursor->Pull(frame, execution_context)) - ; + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); } @@ -405,15 +457,15 @@ static void Unwind(benchmark::State &state) { query::Frame frame(symbol_table.max_position()); frame[list_sym] = query::TypedValue(std::vector<query::TypedValue>(state.range(1))); - // Nothing should be used from the EvaluationContext, so leave it empty. - query::EvaluationContext evaluation_context; + // We need to only set the memory for temporary (per pull) evaluations + TMemory per_pull_memory; + query::EvaluationContext evaluation_context{per_pull_memory.get()}; while (state.KeepRunning()) { query::ExecutionContext execution_context{&dba, symbol_table, evaluation_context}; TMemory memory; auto cursor = unwind.MakeCursor(&dba, memory.get()); - while (cursor->Pull(frame, execution_context)) - ; + while (cursor->Pull(frame, execution_context)) per_pull_memory.Reset(); } state.SetItemsProcessed(state.iterations()); }