From 8fb3a53b78cac75fbc1955e75aa61c2001ecc679 Mon Sep 17 00:00:00 2001 From: Lovro Lugovic Date: Wed, 1 Apr 2020 00:07:32 +0200 Subject: [PATCH] Expose query timeout checking to modules Summary: - Add the `mgp_must_abort(const mgp_graph *graph)` C API. - Add the `ProcCtx.must_abort()` Python API. The usage is very simple -- the function returns a boolean indicating whether the procedure should abort. Reviewers: mferencevic, dsantl Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2742 --- include/mg_procedure.h | 19 +++++++++++++++++++ include/mgp.py | 14 ++++++++++++++ src/query/context.hpp | 8 ++++++++ src/query/plan/operator.cpp | 10 +--------- src/query/procedure/mg_procedure_impl.cpp | 5 +++++ src/query/procedure/mg_procedure_impl.hpp | 4 ++++ src/query/procedure/py_module.cpp | 7 +++++++ 7 files changed, 58 insertions(+), 9 deletions(-) diff --git a/include/mg_procedure.h b/include/mg_procedure.h index 63ce6fc88..2a2567aba 100644 --- a/include/mg_procedure.h +++ b/include/mg_procedure.h @@ -798,6 +798,25 @@ int mgp_proc_add_deprecated_result(struct mgp_proc *proc, const char *name, const struct mgp_type *type); ///@} +/// @name Execution +/// +/// The following functions are used to control the execution of the procedure. +/// +/// @{ + +/// Return non-zero if the currently executing procedure should abort as soon as +/// possible. +/// +/// Procedures which perform heavyweight processing run the risk of running too +/// long and going over the query execution time limit. To prevent this, such +/// procedures should periodically call this function at critical points in +/// their code in order to determine whether they should abort or not. Note that +/// this mechanism is purely cooperative and depends on the procedure doing the +/// checking and aborting on its own. +int mgp_must_abort(const struct mgp_graph *graph); + +/// @} + #ifdef __cplusplus } // extern "C" #endif diff --git a/include/mgp.py b/include/mgp.py index e7ebef8e2..30fc5bec3 100644 --- a/include/mgp.py +++ b/include/mgp.py @@ -531,6 +531,11 @@ class Graph: return Vertices(self._graph) +class AbortError(Exception): + '''Signals that the procedure was asked to abort its execution.''' + pass + + class ProcCtx: '''Context of a procedure being executed. @@ -554,6 +559,15 @@ class ProcCtx: raise InvalidContextError() return self._graph + def must_abort(self) -> bool: + if not self.is_valid(): + raise InvalidContextError() + return self._graph._graph.must_abort() + + def check_must_abort(self): + if self.must_abort(): + raise AbortError + # Additional typing support diff --git a/src/query/context.hpp b/src/query/context.hpp index 923f8a6f3..85b0763da 100644 --- a/src/query/context.hpp +++ b/src/query/context.hpp @@ -59,4 +59,12 @@ struct ExecutionContext { plan::ProfilingStats *stats_root{nullptr}; }; +inline bool MustAbort(const ExecutionContext &context) { + return (context.is_shutting_down && + context.is_shutting_down->load(std::memory_order_acquire)) || + (context.max_execution_time_sec > 0 && + context.execution_tsc_timer.Elapsed() >= + context.max_execution_time_sec); +} + } // namespace query diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 45b7f3444..eaa13554a 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -89,14 +89,6 @@ uint64_t ComputeProfilingKey(const T *obj) { return reinterpret_cast(obj); } -bool MustAbort(const ExecutionContext &context) { - return (context.is_shutting_down && - context.is_shutting_down->load(std::memory_order_acquire)) || - (context.max_execution_time_sec > 0 && - context.execution_tsc_timer.Elapsed() >= - context.max_execution_time_sec); -} - } // namespace #define SCOPED_PROFILE_OP(name) \ @@ -3922,7 +3914,7 @@ class CallProcedureCursor : public Cursor { auto *memory = context.evaluation_context.memory; auto memory_limit = EvalMemoryLimit(&evaluator, self_->memory_limit_, self_->memory_scale_); - mgp_graph graph{context.db_accessor, graph_view}; + mgp_graph graph{context.db_accessor, graph_view, &context}; CallCustomProcedure(self_->procedure_name_, *proc, self_->arguments_, graph, &evaluator, memory, memory_limit, &result_); // Reset result_.signature to nullptr, because outside of this scope we diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index 491083eaf..91f61b821 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -1478,6 +1478,11 @@ int mgp_proc_add_deprecated_result(mgp_proc *proc, const char *name, return AddResultToProc(proc, name, type, true); } +int mgp_must_abort(const mgp_graph *graph) { + CHECK(graph->ctx); + return query::MustAbort(*graph->ctx); +} + namespace query::procedure { namespace { diff --git a/src/query/procedure/mg_procedure_impl.hpp b/src/query/procedure/mg_procedure_impl.hpp index 61908466b..aec7ac4a4 100644 --- a/src/query/procedure/mg_procedure_impl.hpp +++ b/src/query/procedure/mg_procedure_impl.hpp @@ -8,6 +8,7 @@ #include #include +#include "query/context.hpp" #include "query/db_accessor.hpp" #include "query/procedure/cypher_types.hpp" #include "query/typed_value.hpp" @@ -353,6 +354,9 @@ struct mgp_result { struct mgp_graph { query::DbAccessor *impl; storage::View view; + // TODO: Merge `mgp_graph` and `mgp_memory` into a single `mgp_context`. The + // `ctx` field is out of place here. + query::ExecutionContext *ctx; }; struct mgp_properties_iterator { diff --git a/src/query/procedure/py_module.cpp b/src/query/procedure/py_module.cpp index 54c955d0d..9d3c6dd4f 100644 --- a/src/query/procedure/py_module.cpp +++ b/src/query/procedure/py_module.cpp @@ -210,6 +210,11 @@ PyObject *PyGraphIterVertices(PyGraph *self, PyObject *Py_UNUSED(ignored)) { return reinterpret_cast(py_vertices_it); } +PyObject *PyGraphMustAbort(PyGraph *self, PyObject *Py_UNUSED(ignored)) { + CHECK(self->graph); + return PyBool_FromLong(mgp_must_abort(self->graph)); +} + static PyMethodDef PyGraphMethods[] = { {"__reduce__", reinterpret_cast(DisallowPickleAndCopy), METH_NOARGS, "__reduce__ is not supported"}, @@ -222,6 +227,8 @@ static PyMethodDef PyGraphMethods[] = { METH_VARARGS, "Get the vertex or raise IndexError."}, {"iter_vertices", reinterpret_cast(PyGraphIterVertices), METH_NOARGS, "Return _mgp.VerticesIterator."}, + {"must_abort", reinterpret_cast(PyGraphMustAbort), METH_NOARGS, + "Check whether the running procedure should abort"}, {nullptr}, };