Implement query execution timeout
Summary: The query execution now has a timeout for each Cypher query it executes. The timeout is implemented using TSC and will work only when TSC is available (same as PROFILE). TSC is used to mitigate the performance impact of reading the current time constantly. Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2562
This commit is contained in:
parent
673ebf428c
commit
9e04183be7
@ -91,6 +91,11 @@ DEFINE_VALIDATED_int32(
|
||||
"Interval (in milliseconds) used for flushing the audit log buffer.",
|
||||
FLAG_IN_RANGE(10, INT32_MAX));
|
||||
|
||||
// Query flags.
|
||||
DEFINE_uint64(query_execution_timeout_sec, 180,
|
||||
"Maximum allowed query execution time. Queries exceeding this "
|
||||
"limit will be aborted. Value of 0 means no limit.");
|
||||
|
||||
DEFINE_VALIDATED_string(
|
||||
query_modules_directory, "",
|
||||
"Directory where modules with custom query procedures are stored", {
|
||||
@ -179,6 +184,8 @@ void SingleNodeMain() {
|
||||
database::GraphDb db;
|
||||
#endif
|
||||
query::InterpreterContext interpreter_context{&db};
|
||||
query::SetExecutionTimeout(&interpreter_context,
|
||||
FLAGS_query_execution_timeout_sec);
|
||||
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
|
||||
|
||||
// Register modules
|
||||
@ -219,10 +226,13 @@ void SingleNodeMain() {
|
||||
#endif
|
||||
|
||||
// Handler for regular termination signals
|
||||
auto shutdown = [&server] {
|
||||
auto shutdown = [&server, &interpreter_context] {
|
||||
// Server needs to be shutdown first and then the database. This prevents a
|
||||
// race condition when a transaction is accepted during server shutdown.
|
||||
server.Shutdown();
|
||||
// After the server is notified to stop accepting and processing connections
|
||||
// we tell the execution engine to stop processing all pending queries.
|
||||
query::Shutdown(&interpreter_context);
|
||||
};
|
||||
InitSignalHandlers(shutdown);
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "query/frontend/semantic/symbol_table.hpp"
|
||||
#include "query/parameters.hpp"
|
||||
#include "query/plan/profile.hpp"
|
||||
#include "utils/tsc.hpp"
|
||||
|
||||
namespace query {
|
||||
|
||||
@ -49,6 +50,9 @@ struct ExecutionContext {
|
||||
DbAccessor *db_accessor{nullptr};
|
||||
SymbolTable symbol_table;
|
||||
EvaluationContext evaluation_context;
|
||||
utils::TSCTimer execution_tsc_timer;
|
||||
double max_execution_time_sec{0.0};
|
||||
std::atomic<bool> *is_shutting_down{nullptr};
|
||||
bool is_profile_query{false};
|
||||
std::chrono::duration<double> profile_execution_time;
|
||||
plan::ProfilingStats stats;
|
||||
|
@ -610,8 +610,6 @@ class DbAccessor final {
|
||||
|
||||
void Abort() { accessor_->Abort(); }
|
||||
|
||||
bool MustAbort() const { return false; }
|
||||
|
||||
bool LabelIndexExists(storage::Label label) const {
|
||||
return accessor_->LabelIndexExists(label);
|
||||
}
|
||||
@ -784,8 +782,6 @@ class DbAccessor final {
|
||||
|
||||
void AdvanceCommand() { dba_->AdvanceCommand(); }
|
||||
|
||||
bool MustAbort() const { return dba_->should_abort(); }
|
||||
|
||||
bool CreateIndex(storage::Label label, storage::Property prop) {
|
||||
try {
|
||||
dba_->BuildIndex(label, prop);
|
||||
|
@ -103,7 +103,7 @@ class HintedAbortError : public utils::BasicException {
|
||||
: utils::BasicException(
|
||||
"Transaction was asked to abort, most likely because it was "
|
||||
"executing longer than time specified by "
|
||||
"--query-execution-time-sec flag.") {}
|
||||
"--query-execution-timeout-sec flag.") {}
|
||||
};
|
||||
|
||||
class ExplicitTransactionUsageException : public QueryRuntimeException {
|
||||
|
@ -471,6 +471,7 @@ ExecutionContext PullAllPlan(AnyStream *stream, const CachedPlan &plan,
|
||||
bool is_profile_query,
|
||||
std::map<std::string, TypedValue> *summary,
|
||||
DbAccessor *dba,
|
||||
InterpreterContext *interpreter_context,
|
||||
utils::MonotonicBufferResource *execution_memory) {
|
||||
auto cursor = plan.plan().MakeCursor(execution_memory);
|
||||
Frame frame(plan.symbol_table().max_position(), execution_memory);
|
||||
@ -493,6 +494,9 @@ ExecutionContext PullAllPlan(AnyStream *stream, const CachedPlan &plan,
|
||||
NamesToProperties(plan.ast_storage().properties_, dba);
|
||||
ctx.evaluation_context.labels =
|
||||
NamesToLabels(plan.ast_storage().labels_, dba);
|
||||
ctx.execution_tsc_timer = utils::TSCTimer(interpreter_context->tsc_frequency);
|
||||
ctx.max_execution_time_sec = interpreter_context->execution_timeout_sec;
|
||||
ctx.is_shutting_down = &interpreter_context->is_shutting_down;
|
||||
ctx.is_profile_query = is_profile_query;
|
||||
|
||||
utils::Timer timer;
|
||||
@ -662,9 +666,9 @@ PreparedQuery PrepareCypherQuery(
|
||||
std::move(header), std::move(parsed_query.required_privileges),
|
||||
[plan = std::move(plan), parameters = std::move(parsed_query.parameters),
|
||||
output_symbols = std::move(output_symbols), summary, dba,
|
||||
execution_memory](AnyStream *stream) {
|
||||
interpreter_context, execution_memory](AnyStream *stream) {
|
||||
PullAllPlan(stream, *plan, parameters, output_symbols, false, summary,
|
||||
dba, execution_memory);
|
||||
dba, interpreter_context, execution_memory);
|
||||
return QueryHandlerResult::COMMIT;
|
||||
}};
|
||||
}
|
||||
@ -739,7 +743,7 @@ PreparedQuery PrepareProfileQuery(
|
||||
throw ProfileInMulticommandTxException();
|
||||
}
|
||||
|
||||
if (!interpreter_context->is_tsc_available) {
|
||||
if (!interpreter_context->tsc_frequency) {
|
||||
throw QueryException("TSC support is missing for PROFILE");
|
||||
}
|
||||
|
||||
@ -768,10 +772,10 @@ PreparedQuery PrepareProfileQuery(
|
||||
std::move(parsed_query.required_privileges),
|
||||
[plan = std::move(cypher_query_plan),
|
||||
parameters = std::move(parsed_inner_query.parameters), summary, dba,
|
||||
execution_memory](AnyStream *stream) {
|
||||
interpreter_context, execution_memory](AnyStream *stream) {
|
||||
// No output symbols are given so that nothing is streamed.
|
||||
auto ctx = PullAllPlan(stream, *plan, parameters, {}, true, summary,
|
||||
dba, execution_memory);
|
||||
dba, interpreter_context, execution_memory);
|
||||
|
||||
for (const auto &row :
|
||||
ProfilingStatsToTable(ctx.stats, ctx.profile_execution_time)) {
|
||||
@ -957,9 +961,9 @@ PreparedQuery PrepareAuthQuery(
|
||||
[callback = std::move(callback), plan = std::move(plan),
|
||||
parameters = std::move(parsed_query.parameters),
|
||||
output_symbols = std::move(output_symbols), summary, dba,
|
||||
execution_memory](AnyStream *stream) {
|
||||
interpreter_context, execution_memory](AnyStream *stream) {
|
||||
PullAllPlan(stream, *plan, parameters, output_symbols, false, summary,
|
||||
dba, execution_memory);
|
||||
dba, interpreter_context, execution_memory);
|
||||
return callback.should_abort_query ? QueryHandlerResult::ABORT
|
||||
: QueryHandlerResult::COMMIT;
|
||||
}};
|
||||
|
@ -139,7 +139,10 @@ struct InterpreterContext {
|
||||
// developers introduce more bugs in each version. Fortunately, we have
|
||||
// cache so this lock probably won't impact performance much...
|
||||
utils::SpinLock antlr_lock;
|
||||
bool is_tsc_available{utils::CheckAvailableTSC()};
|
||||
std::optional<double> tsc_frequency{utils::GetTSCFrequency()};
|
||||
std::atomic<bool> is_shutting_down{false};
|
||||
// The default execution timeout is 3 minutes.
|
||||
double execution_timeout_sec{180.0};
|
||||
|
||||
auth::Auth *auth{nullptr};
|
||||
|
||||
@ -147,6 +150,17 @@ struct InterpreterContext {
|
||||
utils::SkipList<PlanCacheEntry> plan_cache;
|
||||
};
|
||||
|
||||
/// Function that is used to tell all active interpreters that they should stop
|
||||
/// their ongoing execution.
|
||||
inline void Shutdown(InterpreterContext *context) {
|
||||
context->is_shutting_down.store(true, std::memory_order_release);
|
||||
}
|
||||
|
||||
/// Function used to set the maximum execution timeout in seconds.
|
||||
inline void SetExecutionTimeout(InterpreterContext *context, double timeout) {
|
||||
context->execution_timeout_sec = timeout;
|
||||
}
|
||||
|
||||
class Interpreter final {
|
||||
public:
|
||||
explicit Interpreter(InterpreterContext *interpreter_context);
|
||||
|
@ -89,6 +89,14 @@ uint64_t ComputeProfilingKey(const T *obj) {
|
||||
return reinterpret_cast<uint64_t>(obj);
|
||||
}
|
||||
|
||||
bool MustAbort(const ExecutionContext &context) {
|
||||
return (context.is_shutting_down &&
|
||||
context.is_shutting_down->load(std::memory_order_acquire)) ||
|
||||
(context.max_execution_time_sec > 0 &&
|
||||
context.execution_tsc_timer.Elapsed() >=
|
||||
context.max_execution_time_sec);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
#define SCOPED_PROFILE_OP(name) \
|
||||
@ -314,7 +322,7 @@ class ScanAllCursor : public Cursor {
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("ScanAll");
|
||||
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
|
||||
while (!vertices_ || vertices_it_.value() == vertices_.value().end()) {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
@ -575,7 +583,7 @@ bool Expand::ExpandCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
};
|
||||
|
||||
while (true) {
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
// attempt to get a value from the incoming edges
|
||||
if (in_edges_ && *in_edges_it_ != in_edges_->end()) {
|
||||
auto edge = *(*in_edges_it_)++;
|
||||
@ -863,7 +871,7 @@ class ExpandVariableCursor : public Cursor {
|
||||
// Input Vertex could be null if it is created by a failed optional match.
|
||||
// In those cases we skip that input pull and continue with the next.
|
||||
while (true) {
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
TypedValue &vertex_value = frame[self_.input_symbol_];
|
||||
|
||||
@ -943,7 +951,7 @@ class ExpandVariableCursor : public Cursor {
|
||||
// existing_node criterions, so expand in a loop until either the input
|
||||
// vertex is exhausted or a valid variable-length expansion is available.
|
||||
while (true) {
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
// pop from the stack while there is stuff to pop and the current
|
||||
// level is exhausted
|
||||
while (!edges_.empty() && edges_it_.back() == edges_.back().end()) {
|
||||
@ -1063,7 +1071,7 @@ class STShortestPathCursor : public query::plan::Cursor {
|
||||
if (upper_bound < 1 || lower_bound > upper_bound) continue;
|
||||
|
||||
if (FindPath(*context.db_accessor, source, sink, lower_bound, upper_bound,
|
||||
&frame, &evaluator)) {
|
||||
&frame, &evaluator, context)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -1124,7 +1132,8 @@ class STShortestPathCursor : public query::plan::Cursor {
|
||||
bool FindPath(const DbAccessor &dba, const VertexAccessor &source,
|
||||
const VertexAccessor &sink, int64_t lower_bound,
|
||||
int64_t upper_bound, Frame *frame,
|
||||
ExpressionEvaluator *evaluator) {
|
||||
ExpressionEvaluator *evaluator,
|
||||
const ExecutionContext &context) {
|
||||
using utils::Contains;
|
||||
|
||||
if (source == sink) return false;
|
||||
@ -1158,7 +1167,7 @@ class STShortestPathCursor : public query::plan::Cursor {
|
||||
out_edge[sink] = std::nullopt;
|
||||
|
||||
while (true) {
|
||||
if (dba.MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
// Top-down step (expansion from the source).
|
||||
++current_length;
|
||||
if (current_length > upper_bound) return false;
|
||||
@ -1335,7 +1344,7 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor {
|
||||
|
||||
// do it all in a loop because we skip some elements
|
||||
while (true) {
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
// if we have nothing to visit on the current depth, switch to next
|
||||
if (to_visit_current_.empty()) to_visit_current_.swap(to_visit_next_);
|
||||
|
||||
@ -1515,7 +1524,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
};
|
||||
|
||||
while (true) {
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
if (pq_.empty()) {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
const auto &vertex_value = frame[self_.input_symbol_];
|
||||
@ -1554,7 +1563,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
|
||||
}
|
||||
|
||||
while (!pq_.empty()) {
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
auto current = pq_.top();
|
||||
double current_weight = std::get<0>(current);
|
||||
int current_depth = std::get<1>(current);
|
||||
@ -1926,7 +1935,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
auto &dba = *context.db_accessor;
|
||||
// delete edges first
|
||||
for (TypedValue &expression_result : expression_results) {
|
||||
if (dba.MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
if (expression_result.type() == TypedValue::Type::Edge) {
|
||||
auto maybe_error = dba.RemoveEdge(&expression_result.ValueEdge());
|
||||
if (maybe_error.HasError()) {
|
||||
@ -1947,7 +1956,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
|
||||
// delete vertices
|
||||
for (TypedValue &expression_result : expression_results) {
|
||||
if (dba.MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
switch (expression_result.type()) {
|
||||
case TypedValue::Type::Vertex: {
|
||||
auto &va = expression_result.ValueVertex();
|
||||
@ -2509,7 +2518,7 @@ class AccumulateCursor : public Cursor {
|
||||
if (self_.advance_command_) dba.AdvanceCommand();
|
||||
}
|
||||
|
||||
if (dba.MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
if (cache_it_ == cache_.end()) return false;
|
||||
auto row_it = (cache_it_++)->begin();
|
||||
for (const Symbol &symbol : self_.symbols_) frame[symbol] = *row_it++;
|
||||
@ -3071,7 +3080,7 @@ class OrderByCursor : public Cursor {
|
||||
|
||||
if (cache_it_ == cache_.end()) return false;
|
||||
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
|
||||
// place the output values on the frame
|
||||
DCHECK(self_.output_symbols_.size() == cache_it_->remember.size())
|
||||
@ -3303,7 +3312,7 @@ class UnwindCursor : public Cursor {
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("Unwind");
|
||||
while (true) {
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
// if we reached the end of our list of values
|
||||
// pull from the input
|
||||
if (input_value_it_ == input_value_.end()) {
|
||||
@ -3561,7 +3570,7 @@ class CartesianCursor : public Cursor {
|
||||
restore_frame(self_.right_symbols_, right_op_frame_);
|
||||
}
|
||||
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
|
||||
restore_frame(self_.left_symbols_, *left_op_frames_it_);
|
||||
left_op_frames_it_++;
|
||||
@ -3813,7 +3822,7 @@ class CallProcedureCursor : public Cursor {
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("CallProcedure");
|
||||
|
||||
if (context.db_accessor->MustAbort()) throw HintedAbortError();
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
|
||||
// We need to fetch new procedure results after pulling from input.
|
||||
// TODO: Look into openCypher's distinction between procedures returning an
|
||||
|
@ -3,6 +3,12 @@
|
||||
#include <sys/prctl.h>
|
||||
#include <x86intrin.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
namespace utils {
|
||||
|
||||
// TSC stands for Time-Stamp Counter
|
||||
@ -67,4 +73,51 @@ inline bool CheckAvailableTSC() {
|
||||
return ret == PR_TSC_ENABLE;
|
||||
}
|
||||
|
||||
/// This function calculates the frequency at which the TSC counter increments
|
||||
/// its value. The frequency is already metered by the Linux kernel, but the
|
||||
/// value isn't reliably exposed anywhere for us to read it.
|
||||
/// https://stackoverflow.com/questions/51919219/determine-tsc-frequency-on-linux
|
||||
/// https://stackoverflow.com/questions/35123379/getting-tsc-rate-in-x86-kernel
|
||||
/// Because of that, we determine the value ourselves. We read the value two
|
||||
/// times with a delay between the two reads. The duration of the delay is
|
||||
/// determined using system calls that themselves internally use the already
|
||||
/// calibrated TSC. Because of that we get a very accurate value of the TSC
|
||||
/// frequency.
|
||||
inline std::optional<double> GetTSCFrequency() {
|
||||
if (!CheckAvailableTSC()) return std::nullopt;
|
||||
|
||||
utils::Timer timer;
|
||||
auto start_value = utils::ReadTSC();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
auto duration = timer.Elapsed().count();
|
||||
auto stop_value = utils::ReadTSC();
|
||||
|
||||
auto delta = stop_value - start_value;
|
||||
return static_cast<double>(delta) / duration;
|
||||
}
|
||||
|
||||
/// Class that is used to measure elapsed time using the TSC directly. It has
|
||||
/// almost zero overhead and is appropriate for use in performance critical
|
||||
/// paths.
|
||||
class TSCTimer {
|
||||
public:
|
||||
TSCTimer() {}
|
||||
|
||||
explicit TSCTimer(std::optional<double> frequency) : frequency_(frequency) {
|
||||
if (!frequency_) return;
|
||||
start_value_ = ReadTSC();
|
||||
}
|
||||
|
||||
double Elapsed() const {
|
||||
if (!frequency_) return 0.0;
|
||||
auto current_value = ReadTSC();
|
||||
auto delta = current_value - start_value_;
|
||||
return static_cast<double>(delta) / *frequency_;
|
||||
}
|
||||
|
||||
private:
|
||||
std::optional<double> frequency_;
|
||||
unsigned long long start_value_{0};
|
||||
};
|
||||
|
||||
} // namespace utils
|
||||
|
@ -30,7 +30,7 @@ fi
|
||||
# Start memgraph.
|
||||
$binary_dir/memgraph \
|
||||
--durability-directory=$tmpdir \
|
||||
--query-execution-time-sec=5 \
|
||||
--query-execution-timeout-sec=5 \
|
||||
--session-inactivity-timeout=10 &
|
||||
pid=$!
|
||||
wait_for_server 7687
|
||||
|
Loading…
Reference in New Issue
Block a user