From f8ef7eb2df91330ad6ca09215ad709a39313d023 Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Wed, 26 Sep 2018 11:32:26 +0200 Subject: [PATCH] Check if DB should abort in time consuming operators Reviewers: mtomic, mferencevic Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1611 --- src/query/plan/distributed_ops.cpp | 1 + src/query/plan/operator.cpp | 194 ++++++++++++++++------------- 2 files changed, 108 insertions(+), 87 deletions(-) diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index 0e57514e4..df165ba65 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -1054,6 +1054,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { &context.db_accessor_, self_.graph_view()); while (true) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); TypedValue last_vertex; if (!skip_rest_) { diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 6adc02154..e54675ea3 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -733,6 +733,7 @@ class ExpandVariableCursor : public Cursor { // match. // In those cases we skip that input pull and continue with the next. while (true) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); if (!input_cursor_->Pull(frame, context)) return false; TypedValue &vertex_value = frame[self_.input_symbol_]; @@ -813,6 +814,7 @@ class ExpandVariableCursor : public Cursor { // existing_node criterions, so expand in a loop until either the input // vertex is exhausted or a valid variable-length expansion is available. while (true) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); // pop from the stack while there is stuff to pop and the current // level is exhausted while (!edges_.empty() && edges_it_.back() == edges_.back().end()) { @@ -928,8 +930,8 @@ class STShortestPathCursor : public query::plan::Cursor { if (upper_bound < 1 || lower_bound > upper_bound) continue; - if (FindPath(source, sink, lower_bound, upper_bound, &frame, - &evaluator)) { + if (FindPath(context.db_accessor_, source, sink, lower_bound, upper_bound, + &frame, &evaluator)) { return true; } } @@ -987,7 +989,8 @@ class STShortestPathCursor : public query::plan::Cursor { "Expansion condition must evaluate to boolean or null"); } - bool FindPath(const VertexAccessor &source, const VertexAccessor &sink, + bool FindPath(const database::GraphDbAccessor &dba, + const VertexAccessor &source, const VertexAccessor &sink, int64_t lower_bound, int64_t upper_bound, Frame *frame, ExpressionEvaluator *evaluator) { using utils::Contains; @@ -1022,6 +1025,7 @@ class STShortestPathCursor : public query::plan::Cursor { out_edge[sink] = std::experimental::nullopt; while (true) { + if (dba.should_abort()) throw HintedAbortError(); // Top-down step (expansion from the source). ++current_length; if (current_length > upper_bound) return false; @@ -1179,6 +1183,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { // do it all in a loop because we skip some elements while (true) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); // if we have nothing to visit on the current depth, switch to next if (to_visit_current_.empty()) to_visit_current_.swap(to_visit_next_); @@ -1347,6 +1352,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { }; while (true) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); if (pq_.empty()) { if (!input_cursor_->Pull(frame, context)) return false; auto vertex_value = frame[self_.input_symbol_]; @@ -1386,6 +1392,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { } while (!pq_.empty()) { + if (context.db_accessor_.should_abort()) throw HintedAbortError(); auto current = pq_.top(); double current_weight = std::get<0>(current); int current_depth = std::get<1>(current); @@ -1748,12 +1755,15 @@ bool Delete::DeleteCursor::Pull(Frame &frame, Context &context) { } // delete edges first - for (TypedValue &expression_result : expression_results) + for (TypedValue &expression_result : expression_results) { + if (db_.should_abort()) throw HintedAbortError(); if (expression_result.type() == TypedValue::Type::Edge) db_.RemoveEdge(expression_result.Value()); + } // delete vertices - for (TypedValue &expression_result : expression_results) + for (TypedValue &expression_result : expression_results) { + if (db_.should_abort()) throw HintedAbortError(); switch (expression_result.type()) { case TypedValue::Type::Vertex: { VertexAccessor &va = expression_result.Value(); @@ -1775,6 +1785,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, Context &context) { default: throw QueryRuntimeException("Only edges and vertices can be deleted."); } + } return true; } @@ -2225,6 +2236,7 @@ bool Accumulate::AccumulateCursor::Pull(Frame &frame, Context &context) { } } + if (db_.should_abort()) throw HintedAbortError(); if (cache_it_ == cache_.end()) return false; auto row_it = (cache_it_++)->begin(); for (const Symbol &symbol : self_.symbols_) frame[symbol] = *row_it++; @@ -2327,8 +2339,9 @@ void Aggregate::AggregateCursor::ProcessAll(Frame &frame, Context &context) { ExpressionEvaluator evaluator(&frame, context.symbol_table_, context.evaluation_context_, &context.db_accessor_, GraphView::NEW); - while (input_cursor_->Pull(frame, context)) + while (input_cursor_->Pull(frame, context)) { ProcessOne(frame, context.symbol_table_, evaluator); + } // calculate AVG aggregations (so far they have only been summed) for (int pos = 0; pos < static_cast(self_.aggregations_.size()); ++pos) { @@ -2712,6 +2725,8 @@ bool OrderBy::OrderByCursor::Pull(Frame &frame, Context &context) { if (cache_it_ == cache_.end()) return false; + if (context.db_accessor_.should_abort()) throw HintedAbortError(); + // place the output values on the frame DCHECK(self_.output_symbols_.size() == cache_it_->second.size()) << "Number of values does not match the number of output symbols " @@ -2768,38 +2783,39 @@ Merge::MergeCursor::MergeCursor(const Merge &self, merge_create_cursor_(self.merge_create_->MakeCursor(db)) {} bool Merge::MergeCursor::Pull(Frame &frame, Context &context) { - if (pull_input_) { - if (input_cursor_->Pull(frame, context)) { - // after a successful input from the input - // reset merge_match (it's expand iterators maintain state) - // and merge_create (could have a Once at the beginning) - merge_match_cursor_->Reset(); - merge_create_cursor_->Reset(); - } else - // input is exhausted, we're done - return false; - } - - // pull from the merge_match cursor - if (merge_match_cursor_->Pull(frame, context)) { - // if successful, next Pull from this should not pull_input_ - pull_input_ = false; - return true; - } else { - // failed to Pull from the merge_match cursor + while (true) { if (pull_input_) { - // if we have just now pulled from the input - // and failed to pull from merge_match, we should create - __attribute__((unused)) bool merge_create_pull_result = - merge_create_cursor_->Pull(frame, context); - DCHECK(merge_create_pull_result) << "MergeCreate must never fail"; - return true; + if (input_cursor_->Pull(frame, context)) { + // after a successful input from the input + // reset merge_match (it's expand iterators maintain state) + // and merge_create (could have a Once at the beginning) + merge_match_cursor_->Reset(); + merge_create_cursor_->Reset(); + } else + // input is exhausted, we're done + return false; + } + + // pull from the merge_match cursor + if (merge_match_cursor_->Pull(frame, context)) { + // if successful, next Pull from this should not pull_input_ + pull_input_ = false; + return true; + } else { + // failed to Pull from the merge_match cursor + if (pull_input_) { + // if we have just now pulled from the input + // and failed to pull from merge_match, we should create + __attribute__((unused)) bool merge_create_pull_result = + merge_create_cursor_->Pull(frame, context); + DCHECK(merge_create_pull_result) << "MergeCreate must never fail"; + return true; + } + // We have exhausted merge_match_cursor_ after 1 or more successful + // Pulls. Attempt next input_cursor_ pull + pull_input_ = true; + continue; } - // we have exhausted merge_match_cursor_ after 1 or more successful - // Pulls - // attempt next input_cursor_ pull - pull_input_ = true; - return Pull(frame, context); } } @@ -2849,37 +2865,39 @@ Optional::OptionalCursor::OptionalCursor(const Optional &self, optional_cursor_(self.optional_->MakeCursor(db)) {} bool Optional::OptionalCursor::Pull(Frame &frame, Context &context) { - if (pull_input_) { - if (input_cursor_->Pull(frame, context)) { - // after a successful input from the input - // reset optional_ (it's expand iterators maintain state) - optional_cursor_->Reset(); - } else - // input is exhausted, we're done - return false; - } - - // pull from the optional_ cursor - if (optional_cursor_->Pull(frame, context)) { - // if successful, next Pull from this should not pull_input_ - pull_input_ = false; - return true; - } else { - // failed to Pull from the merge_match cursor + while (true) { if (pull_input_) { - // if we have just now pulled from the input - // and failed to pull from optional_ so set the - // optional symbols to Null, ensure next time the - // input gets pulled and return true - for (const Symbol &sym : self_.optional_symbols_) - frame[sym] = TypedValue::Null; - pull_input_ = true; - return true; + if (input_cursor_->Pull(frame, context)) { + // after a successful input from the input + // reset optional_ (it's expand iterators maintain state) + optional_cursor_->Reset(); + } else + // input is exhausted, we're done + return false; + } + + // pull from the optional_ cursor + if (optional_cursor_->Pull(frame, context)) { + // if successful, next Pull from this should not pull_input_ + pull_input_ = false; + return true; + } else { + // failed to Pull from the merge_match cursor + if (pull_input_) { + // if we have just now pulled from the input + // and failed to pull from optional_ so set the + // optional symbols to Null, ensure next time the + // input gets pulled and return true + for (const Symbol &sym : self_.optional_symbols_) + frame[sym] = TypedValue::Null; + pull_input_ = true; + return true; + } + // we have exhausted optional_cursor_ after 1 or more successful Pulls + // attempt next input_cursor_ pull + pull_input_ = true; + continue; } - // we have exhausted optional_cursor_ after 1 or more successful Pulls - // attempt next input_cursor_ pull - pull_input_ = true; - return Pull(frame, context); } } @@ -2918,30 +2936,32 @@ Unwind::UnwindCursor::UnwindCursor(const Unwind &self, : self_(self), db_(db), input_cursor_(self.input_->MakeCursor(db)) {} bool Unwind::UnwindCursor::Pull(Frame &frame, Context &context) { - if (db_.should_abort()) throw HintedAbortError(); - // if we reached the end of our list of values - // pull from the input - if (input_value_it_ == input_value_.end()) { - if (!input_cursor_->Pull(frame, context)) return false; + while (true) { + if (db_.should_abort()) throw HintedAbortError(); + // if we reached the end of our list of values + // pull from the input + if (input_value_it_ == input_value_.end()) { + if (!input_cursor_->Pull(frame, context)) return false; - // successful pull from input, initialize value and iterator - ExpressionEvaluator evaluator(&frame, context.symbol_table_, - context.evaluation_context_, - &context.db_accessor_, GraphView::OLD); - TypedValue input_value = self_.input_expression_->Accept(evaluator); - if (input_value.type() != TypedValue::Type::List) - throw QueryRuntimeException( - "Argument of UNWIND must be a list, but '{}' was provided.", - input_value.type()); - input_value_ = input_value.Value>(); - input_value_it_ = input_value_.begin(); + // successful pull from input, initialize value and iterator + ExpressionEvaluator evaluator(&frame, context.symbol_table_, + context.evaluation_context_, + &context.db_accessor_, GraphView::OLD); + TypedValue input_value = self_.input_expression_->Accept(evaluator); + if (input_value.type() != TypedValue::Type::List) + throw QueryRuntimeException( + "Argument of UNWIND must be a list, but '{}' was provided.", + input_value.type()); + input_value_ = input_value.Value>(); + input_value_it_ = input_value_.begin(); + } + + // if we reached the end of our list of values goto back to top + if (input_value_it_ == input_value_.end()) continue; + + frame[self_.output_symbol_] = *input_value_it_++; + return true; } - - // if we reached the end of our list of values goto back to top - if (input_value_it_ == input_value_.end()) return Pull(frame, context); - - frame[self_.output_symbol_] = *input_value_it_++; - return true; } void Unwind::UnwindCursor::Shutdown() { input_cursor_->Shutdown(); }