Expose query timeout checking to modules
Summary: - Add the `mgp_must_abort(const mgp_graph *graph)` C API. - Add the `ProcCtx.must_abort()` Python API. The usage is very simple -- the function returns a boolean indicating whether the procedure should abort. Reviewers: mferencevic, dsantl Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2742
This commit is contained in:
parent
a1b5bdd88f
commit
8fb3a53b78
@ -798,6 +798,25 @@ int mgp_proc_add_deprecated_result(struct mgp_proc *proc, const char *name,
|
|||||||
const struct mgp_type *type);
|
const struct mgp_type *type);
|
||||||
///@}
|
///@}
|
||||||
|
|
||||||
|
/// @name Execution
|
||||||
|
///
|
||||||
|
/// The following functions are used to control the execution of the procedure.
|
||||||
|
///
|
||||||
|
/// @{
|
||||||
|
|
||||||
|
/// Return non-zero if the currently executing procedure should abort as soon as
|
||||||
|
/// possible.
|
||||||
|
///
|
||||||
|
/// Procedures which perform heavyweight processing run the risk of running too
|
||||||
|
/// long and going over the query execution time limit. To prevent this, such
|
||||||
|
/// procedures should periodically call this function at critical points in
|
||||||
|
/// their code in order to determine whether they should abort or not. Note that
|
||||||
|
/// this mechanism is purely cooperative and depends on the procedure doing the
|
||||||
|
/// checking and aborting on its own.
|
||||||
|
int mgp_must_abort(const struct mgp_graph *graph);
|
||||||
|
|
||||||
|
/// @}
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
} // extern "C"
|
} // extern "C"
|
||||||
#endif
|
#endif
|
||||||
|
@ -531,6 +531,11 @@ class Graph:
|
|||||||
return Vertices(self._graph)
|
return Vertices(self._graph)
|
||||||
|
|
||||||
|
|
||||||
|
class AbortError(Exception):
|
||||||
|
'''Signals that the procedure was asked to abort its execution.'''
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ProcCtx:
|
class ProcCtx:
|
||||||
'''Context of a procedure being executed.
|
'''Context of a procedure being executed.
|
||||||
|
|
||||||
@ -554,6 +559,15 @@ class ProcCtx:
|
|||||||
raise InvalidContextError()
|
raise InvalidContextError()
|
||||||
return self._graph
|
return self._graph
|
||||||
|
|
||||||
|
def must_abort(self) -> bool:
|
||||||
|
if not self.is_valid():
|
||||||
|
raise InvalidContextError()
|
||||||
|
return self._graph._graph.must_abort()
|
||||||
|
|
||||||
|
def check_must_abort(self):
|
||||||
|
if self.must_abort():
|
||||||
|
raise AbortError
|
||||||
|
|
||||||
|
|
||||||
# Additional typing support
|
# Additional typing support
|
||||||
|
|
||||||
|
@ -59,4 +59,12 @@ struct ExecutionContext {
|
|||||||
plan::ProfilingStats *stats_root{nullptr};
|
plan::ProfilingStats *stats_root{nullptr};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
inline bool MustAbort(const ExecutionContext &context) {
|
||||||
|
return (context.is_shutting_down &&
|
||||||
|
context.is_shutting_down->load(std::memory_order_acquire)) ||
|
||||||
|
(context.max_execution_time_sec > 0 &&
|
||||||
|
context.execution_tsc_timer.Elapsed() >=
|
||||||
|
context.max_execution_time_sec);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace query
|
} // namespace query
|
||||||
|
@ -89,14 +89,6 @@ uint64_t ComputeProfilingKey(const T *obj) {
|
|||||||
return reinterpret_cast<uint64_t>(obj);
|
return reinterpret_cast<uint64_t>(obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MustAbort(const ExecutionContext &context) {
|
|
||||||
return (context.is_shutting_down &&
|
|
||||||
context.is_shutting_down->load(std::memory_order_acquire)) ||
|
|
||||||
(context.max_execution_time_sec > 0 &&
|
|
||||||
context.execution_tsc_timer.Elapsed() >=
|
|
||||||
context.max_execution_time_sec);
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
#define SCOPED_PROFILE_OP(name) \
|
#define SCOPED_PROFILE_OP(name) \
|
||||||
@ -3922,7 +3914,7 @@ class CallProcedureCursor : public Cursor {
|
|||||||
auto *memory = context.evaluation_context.memory;
|
auto *memory = context.evaluation_context.memory;
|
||||||
auto memory_limit = EvalMemoryLimit(&evaluator, self_->memory_limit_,
|
auto memory_limit = EvalMemoryLimit(&evaluator, self_->memory_limit_,
|
||||||
self_->memory_scale_);
|
self_->memory_scale_);
|
||||||
mgp_graph graph{context.db_accessor, graph_view};
|
mgp_graph graph{context.db_accessor, graph_view, &context};
|
||||||
CallCustomProcedure(self_->procedure_name_, *proc, self_->arguments_,
|
CallCustomProcedure(self_->procedure_name_, *proc, self_->arguments_,
|
||||||
graph, &evaluator, memory, memory_limit, &result_);
|
graph, &evaluator, memory, memory_limit, &result_);
|
||||||
// Reset result_.signature to nullptr, because outside of this scope we
|
// Reset result_.signature to nullptr, because outside of this scope we
|
||||||
|
@ -1478,6 +1478,11 @@ int mgp_proc_add_deprecated_result(mgp_proc *proc, const char *name,
|
|||||||
return AddResultToProc(proc, name, type, true);
|
return AddResultToProc(proc, name, type, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int mgp_must_abort(const mgp_graph *graph) {
|
||||||
|
CHECK(graph->ctx);
|
||||||
|
return query::MustAbort(*graph->ctx);
|
||||||
|
}
|
||||||
|
|
||||||
namespace query::procedure {
|
namespace query::procedure {
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
#include <optional>
|
#include <optional>
|
||||||
#include <ostream>
|
#include <ostream>
|
||||||
|
|
||||||
|
#include "query/context.hpp"
|
||||||
#include "query/db_accessor.hpp"
|
#include "query/db_accessor.hpp"
|
||||||
#include "query/procedure/cypher_types.hpp"
|
#include "query/procedure/cypher_types.hpp"
|
||||||
#include "query/typed_value.hpp"
|
#include "query/typed_value.hpp"
|
||||||
@ -353,6 +354,9 @@ struct mgp_result {
|
|||||||
struct mgp_graph {
|
struct mgp_graph {
|
||||||
query::DbAccessor *impl;
|
query::DbAccessor *impl;
|
||||||
storage::View view;
|
storage::View view;
|
||||||
|
// TODO: Merge `mgp_graph` and `mgp_memory` into a single `mgp_context`. The
|
||||||
|
// `ctx` field is out of place here.
|
||||||
|
query::ExecutionContext *ctx;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct mgp_properties_iterator {
|
struct mgp_properties_iterator {
|
||||||
|
@ -210,6 +210,11 @@ PyObject *PyGraphIterVertices(PyGraph *self, PyObject *Py_UNUSED(ignored)) {
|
|||||||
return reinterpret_cast<PyObject *>(py_vertices_it);
|
return reinterpret_cast<PyObject *>(py_vertices_it);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PyObject *PyGraphMustAbort(PyGraph *self, PyObject *Py_UNUSED(ignored)) {
|
||||||
|
CHECK(self->graph);
|
||||||
|
return PyBool_FromLong(mgp_must_abort(self->graph));
|
||||||
|
}
|
||||||
|
|
||||||
static PyMethodDef PyGraphMethods[] = {
|
static PyMethodDef PyGraphMethods[] = {
|
||||||
{"__reduce__", reinterpret_cast<PyCFunction>(DisallowPickleAndCopy),
|
{"__reduce__", reinterpret_cast<PyCFunction>(DisallowPickleAndCopy),
|
||||||
METH_NOARGS, "__reduce__ is not supported"},
|
METH_NOARGS, "__reduce__ is not supported"},
|
||||||
@ -222,6 +227,8 @@ static PyMethodDef PyGraphMethods[] = {
|
|||||||
METH_VARARGS, "Get the vertex or raise IndexError."},
|
METH_VARARGS, "Get the vertex or raise IndexError."},
|
||||||
{"iter_vertices", reinterpret_cast<PyCFunction>(PyGraphIterVertices),
|
{"iter_vertices", reinterpret_cast<PyCFunction>(PyGraphIterVertices),
|
||||||
METH_NOARGS, "Return _mgp.VerticesIterator."},
|
METH_NOARGS, "Return _mgp.VerticesIterator."},
|
||||||
|
{"must_abort", reinterpret_cast<PyCFunction>(PyGraphMustAbort), METH_NOARGS,
|
||||||
|
"Check whether the running procedure should abort"},
|
||||||
{nullptr},
|
{nullptr},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user