Use timers for query timeout thread (#163)
Co-authored-by: Benjamin Antal <benjamin.antal@memgraph.io>
This commit is contained in:
parent
cbf826e0c3
commit
15911b64dc
@ -1,11 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include "query/common.hpp"
|
||||
#include "query/frontend/semantic/symbol_table.hpp"
|
||||
#include "query/parameters.hpp"
|
||||
#include "query/plan/profile.hpp"
|
||||
#include "query/trigger.hpp"
|
||||
#include "utils/tsc.hpp"
|
||||
#include "utils/async_timer.hpp"
|
||||
|
||||
namespace query {
|
||||
|
||||
@ -50,20 +52,25 @@ struct ExecutionContext {
|
||||
DbAccessor *db_accessor{nullptr};
|
||||
SymbolTable symbol_table;
|
||||
EvaluationContext evaluation_context;
|
||||
utils::TSCTimer execution_tsc_timer;
|
||||
double max_execution_time_sec{0.0};
|
||||
std::atomic<bool> *is_shutting_down{nullptr};
|
||||
bool is_profile_query{false};
|
||||
std::chrono::duration<double> profile_execution_time;
|
||||
plan::ProfilingStats stats;
|
||||
plan::ProfilingStats *stats_root{nullptr};
|
||||
TriggerContextCollector *trigger_context_collector{nullptr};
|
||||
utils::AsyncTimer timer;
|
||||
};
|
||||
|
||||
static_assert(std::is_move_assignable_v<ExecutionContext>, "ExecutionContext must be move assignable!");
|
||||
static_assert(std::is_move_constructible_v<ExecutionContext>, "ExecutionContext must be move constructible!");
|
||||
|
||||
inline bool MustAbort(const ExecutionContext &context) {
|
||||
return (context.is_shutting_down && context.is_shutting_down->load(std::memory_order_acquire)) ||
|
||||
(context.max_execution_time_sec > 0 &&
|
||||
context.execution_tsc_timer.Elapsed() >= context.max_execution_time_sec);
|
||||
return (context.is_shutting_down != nullptr && context.is_shutting_down->load(std::memory_order_acquire)) ||
|
||||
context.timer.IsExpired();
|
||||
}
|
||||
|
||||
inline plan::ProfilingStatsWithTotalTime GetStatsWithTotalTime(const ExecutionContext &context) {
|
||||
return plan::ProfilingStatsWithTotalTime{context.stats, context.profile_execution_time};
|
||||
}
|
||||
|
||||
} // namespace query
|
||||
|
@ -473,9 +473,9 @@ struct PullPlan {
|
||||
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
|
||||
TriggerContextCollector *trigger_context_collector = nullptr,
|
||||
std::optional<size_t> memory_limit = {});
|
||||
std::optional<ExecutionContext> Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
|
||||
private:
|
||||
std::shared_ptr<CachedPlan> plan_ = nullptr;
|
||||
@ -513,16 +513,17 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
ctx_.evaluation_context.parameters = parameters;
|
||||
ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba);
|
||||
ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba);
|
||||
ctx_.execution_tsc_timer = utils::TSCTimer(interpreter_context->tsc_frequency);
|
||||
ctx_.max_execution_time_sec = interpreter_context->execution_timeout_sec;
|
||||
if (interpreter_context->execution_timeout_sec > 0) {
|
||||
ctx_.timer = utils::AsyncTimer{interpreter_context->execution_timeout_sec};
|
||||
}
|
||||
ctx_.is_shutting_down = &interpreter_context->is_shutting_down;
|
||||
ctx_.is_profile_query = is_profile_query;
|
||||
ctx_.trigger_context_collector = trigger_context_collector;
|
||||
}
|
||||
|
||||
std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
@ -595,7 +596,7 @@ std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<
|
||||
summary->insert_or_assign("plan_execution_time", execution_time_.count());
|
||||
cursor_->Shutdown();
|
||||
ctx_.profile_execution_time = execution_time_;
|
||||
return std::move(ctx_);
|
||||
return GetStatsWithTotalTime(ctx_);
|
||||
}
|
||||
|
||||
using RWType = plan::ReadWriteTypeChecker::RWType;
|
||||
@ -828,32 +829,33 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
||||
auto rw_type_checker = plan::ReadWriteTypeChecker();
|
||||
rw_type_checker.InferRWType(const_cast<plan::LogicalOperator &>(cypher_query_plan->plan()));
|
||||
|
||||
return PreparedQuery{
|
||||
{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"},
|
||||
std::move(parsed_query.required_privileges),
|
||||
[plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), summary, dba,
|
||||
interpreter_context, execution_memory, memory_limit,
|
||||
// We want to execute the query we are profiling lazily, so we delay
|
||||
// the construction of the corresponding context.
|
||||
ctx = std::optional<ExecutionContext>{}, pull_plan = std::shared_ptr<PullPlanVector>(nullptr)](
|
||||
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
|
||||
// No output symbols are given so that nothing is streamed.
|
||||
if (!ctx) {
|
||||
ctx = PullPlan(plan, parameters, true, dba, interpreter_context, execution_memory, nullptr, memory_limit)
|
||||
.Pull(stream, {}, {}, summary);
|
||||
pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(ctx->stats, ctx->profile_execution_time));
|
||||
}
|
||||
return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"},
|
||||
std::move(parsed_query.required_privileges),
|
||||
[plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters),
|
||||
summary, dba, interpreter_context, execution_memory, memory_limit,
|
||||
// We want to execute the query we are profiling lazily, so we delay
|
||||
// the construction of the corresponding context.
|
||||
stats_and_total_time = std::optional<plan::ProfilingStatsWithTotalTime>{},
|
||||
pull_plan = std::shared_ptr<PullPlanVector>(nullptr)](
|
||||
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
|
||||
// No output symbols are given so that nothing is streamed.
|
||||
if (!stats_and_total_time) {
|
||||
stats_and_total_time = PullPlan(plan, parameters, true, dba, interpreter_context,
|
||||
execution_memory, nullptr, memory_limit)
|
||||
.Pull(stream, {}, {}, summary);
|
||||
pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(*stats_and_total_time));
|
||||
}
|
||||
|
||||
MG_ASSERT(ctx, "Failed to execute the query!");
|
||||
MG_ASSERT(stats_and_total_time, "Failed to execute the query!");
|
||||
|
||||
if (pull_plan->Pull(stream, n)) {
|
||||
summary->insert_or_assign("profile", ProfilingStatsToJson(ctx->stats, ctx->profile_execution_time).dump());
|
||||
return QueryHandlerResult::ABORT;
|
||||
}
|
||||
if (pull_plan->Pull(stream, n)) {
|
||||
summary->insert_or_assign("profile", ProfilingStatsToJson(*stats_and_total_time).dump());
|
||||
return QueryHandlerResult::ABORT;
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
},
|
||||
rw_type_checker.type};
|
||||
return std::nullopt;
|
||||
},
|
||||
rw_type_checker.type};
|
||||
}
|
||||
|
||||
PreparedQuery PrepareDumpQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary, DbAccessor *dba,
|
||||
@ -1602,9 +1604,8 @@ void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, Interpret
|
||||
|
||||
trigger_context.AdaptForAccessor(&db_accessor);
|
||||
try {
|
||||
trigger.Execute(&db_accessor, &execution_memory, *interpreter_context->tsc_frequency,
|
||||
interpreter_context->execution_timeout_sec, &interpreter_context->is_shutting_down,
|
||||
trigger_context);
|
||||
trigger.Execute(&db_accessor, &execution_memory, interpreter_context->execution_timeout_sec,
|
||||
&interpreter_context->is_shutting_down, trigger_context);
|
||||
} catch (const utils::BasicException &exception) {
|
||||
spdlog::warn("Trigger '{}' failed with exception:\n{}", trigger.Name(), exception.what());
|
||||
db_accessor.Abort();
|
||||
@ -1658,9 +1659,8 @@ void Interpreter::Commit() {
|
||||
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
|
||||
AdvanceCommand();
|
||||
try {
|
||||
trigger.Execute(&*execution_db_accessor_, &execution_memory, *interpreter_context_->tsc_frequency,
|
||||
interpreter_context_->execution_timeout_sec, &interpreter_context_->is_shutting_down,
|
||||
*trigger_context);
|
||||
trigger.Execute(&*execution_db_accessor_, &execution_memory, interpreter_context_->execution_timeout_sec,
|
||||
&interpreter_context_->is_shutting_down, *trigger_context);
|
||||
} catch (const utils::BasicException &e) {
|
||||
throw utils::BasicException(
|
||||
fmt::format("Trigger '{}' caused the transaction to fail.\nException: {}", trigger.Name(), e.what()));
|
||||
|
@ -98,10 +98,9 @@ class ProfilingStatsToTableHelper {
|
||||
|
||||
} // namespace
|
||||
|
||||
std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStats &cumulative_stats,
|
||||
std::chrono::duration<double> total_time) {
|
||||
ProfilingStatsToTableHelper helper{cumulative_stats.num_cycles, total_time};
|
||||
helper.Output(cumulative_stats);
|
||||
std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStatsWithTotalTime &stats) {
|
||||
ProfilingStatsToTableHelper helper{stats.cumulative_stats.num_cycles, stats.total_time};
|
||||
helper.Output(stats.cumulative_stats);
|
||||
return helper.rows();
|
||||
}
|
||||
|
||||
@ -147,9 +146,9 @@ class ProfilingStatsToJsonHelper {
|
||||
|
||||
} // namespace
|
||||
|
||||
nlohmann::json ProfilingStatsToJson(const ProfilingStats &cumulative_stats, std::chrono::duration<double> total_time) {
|
||||
ProfilingStatsToJsonHelper helper{cumulative_stats.num_cycles, total_time};
|
||||
helper.Output(cumulative_stats);
|
||||
nlohmann::json ProfilingStatsToJson(const ProfilingStatsWithTotalTime &stats) {
|
||||
ProfilingStatsToJsonHelper helper{stats.cumulative_stats.num_cycles, stats.total_time};
|
||||
helper.Output(stats.cumulative_stats);
|
||||
return helper.ToJson();
|
||||
}
|
||||
|
||||
|
@ -23,10 +23,14 @@ struct ProfilingStats {
|
||||
std::vector<ProfilingStats> children;
|
||||
};
|
||||
|
||||
std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStats &cumulative_stats,
|
||||
std::chrono::duration<double>);
|
||||
struct ProfilingStatsWithTotalTime {
|
||||
ProfilingStats cumulative_stats{};
|
||||
std::chrono::duration<double> total_time{};
|
||||
};
|
||||
|
||||
nlohmann::json ProfilingStatsToJson(const ProfilingStats &cumulative_stats, std::chrono::duration<double>);
|
||||
std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStatsWithTotalTime &stats);
|
||||
|
||||
nlohmann::json ProfilingStatsToJson(const ProfilingStatsWithTotalTime &stats);
|
||||
|
||||
} // namespace plan
|
||||
} // namespace query
|
||||
|
@ -172,7 +172,7 @@ std::shared_ptr<Trigger::TriggerPlan> Trigger::GetPlan(DbAccessor *db_accessor)
|
||||
return trigger_plan_;
|
||||
}
|
||||
|
||||
void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, const double tsc_frequency,
|
||||
void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory,
|
||||
const double max_execution_time_sec, std::atomic<bool> *is_shutting_down,
|
||||
const TriggerContext &context) const {
|
||||
if (!context.ShouldEventTrigger(event_type_)) {
|
||||
@ -193,8 +193,7 @@ void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution
|
||||
ctx.evaluation_context.parameters = parsed_statements_.parameters;
|
||||
ctx.evaluation_context.properties = NamesToProperties(plan.ast_storage().properties_, dba);
|
||||
ctx.evaluation_context.labels = NamesToLabels(plan.ast_storage().labels_, dba);
|
||||
ctx.execution_tsc_timer = utils::TSCTimer(tsc_frequency);
|
||||
ctx.max_execution_time_sec = max_execution_time_sec;
|
||||
ctx.timer = utils::AsyncTimer(max_execution_time_sec);
|
||||
ctx.is_shutting_down = is_shutting_down;
|
||||
ctx.is_profile_query = false;
|
||||
|
||||
|
@ -23,8 +23,8 @@ struct Trigger {
|
||||
const std::map<std::string, storage::PropertyValue> &user_parameters, TriggerEventType event_type,
|
||||
utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, utils::SpinLock *antlr_lock);
|
||||
|
||||
void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double tsc_frequency,
|
||||
double max_execution_time_sec, std::atomic<bool> *is_shutting_down, const TriggerContext &context) const;
|
||||
void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double max_execution_time_sec,
|
||||
std::atomic<bool> *is_shutting_down, const TriggerContext &context) const;
|
||||
|
||||
bool operator==(const Trigger &other) const { return name_ == other.name_; }
|
||||
// NOLINTNEXTLINE (modernize-use-nullptr)
|
||||
|
@ -1,4 +1,5 @@
|
||||
set(utils_src_files
|
||||
async_timer.cpp
|
||||
event_counter.cpp
|
||||
csv_parsing.cpp
|
||||
file.cpp
|
||||
@ -13,7 +14,7 @@ set(utils_src_files
|
||||
uuid.cpp)
|
||||
|
||||
add_library(mg-utils STATIC ${utils_src_files})
|
||||
target_link_libraries(mg-utils stdc++fs Threads::Threads spdlog fmt gflags uuid)
|
||||
target_link_libraries(mg-utils stdc++fs Threads::Threads spdlog fmt gflags uuid rt)
|
||||
|
||||
add_library(mg-new-delete STATIC new_delete.cpp)
|
||||
target_link_libraries(mg-new-delete jemalloc fmt)
|
||||
|
187
src/utils/async_timer.cpp
Normal file
187
src/utils/async_timer.cpp
Normal file
@ -0,0 +1,187 @@
|
||||
#include "utils/async_timer.hpp"
|
||||
|
||||
#include <csignal>
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cmath>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
#include "utils/skip_list.hpp"
|
||||
#include "utils/spin_lock.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr uint64_t kInvalidFlagId = 0U;
|
||||
// std::numeric_limits<time_t>::max() cannot be represented precisely as a double, so the next smallest value is the
|
||||
// maximum number of seconds the timer can be used with
|
||||
const double max_seconds_as_double = std::nexttoward(std::numeric_limits<time_t>::max(), 0.0);
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
std::atomic<uint64_t> expiration_flag_counter{kInvalidFlagId + 1U};
|
||||
|
||||
struct ExpirationFlagInfo {
|
||||
uint64_t id{0U};
|
||||
std::weak_ptr<std::atomic<bool>> flag{};
|
||||
};
|
||||
|
||||
bool operator==(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id == rhs.id; }
|
||||
bool operator<(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id < rhs.id; }
|
||||
bool operator==(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id == id; }
|
||||
bool operator<(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id < id; }
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
utils::SkipList<ExpirationFlagInfo> expiration_flags{};
|
||||
|
||||
uint64_t AddFlag(std::weak_ptr<std::atomic<bool>> flag) {
|
||||
const auto id = expiration_flag_counter.fetch_add(1, std::memory_order_relaxed);
|
||||
expiration_flags.access().insert({id, std::move(flag)});
|
||||
return id;
|
||||
}
|
||||
|
||||
void EraseFlag(uint64_t flag_id) { expiration_flags.access().remove(flag_id); }
|
||||
|
||||
std::weak_ptr<std::atomic<bool>> GetFlag(uint64_t flag_id) {
|
||||
const auto flag_accessor = expiration_flags.access();
|
||||
const auto it = flag_accessor.find(flag_id);
|
||||
if (it == flag_accessor.end()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return it->flag;
|
||||
}
|
||||
|
||||
void MarkDone(const uint64_t flag_id) {
|
||||
const auto weak_flag = GetFlag(flag_id);
|
||||
if (weak_flag.expired()) {
|
||||
return;
|
||||
}
|
||||
auto flag = weak_flag.lock();
|
||||
if (flag != nullptr) {
|
||||
flag->store(true, std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace utils {
|
||||
|
||||
namespace {
|
||||
struct ThreadInfo {
|
||||
pid_t thread_id;
|
||||
std::atomic<bool> setup_done{false};
|
||||
};
|
||||
|
||||
void *TimerBackgroundWorker(void *args) {
|
||||
auto *thread_info = static_cast<ThreadInfo *>(args);
|
||||
thread_info->thread_id = syscall(SYS_gettid);
|
||||
thread_info->setup_done.store(true, std::memory_order_release);
|
||||
|
||||
sigset_t ss;
|
||||
sigemptyset(&ss);
|
||||
sigaddset(&ss, SIGTIMER);
|
||||
sigprocmask(SIG_BLOCK, &ss, nullptr);
|
||||
|
||||
while (true) {
|
||||
siginfo_t si;
|
||||
int result = sigwaitinfo(&ss, &si);
|
||||
|
||||
if (result <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (si.si_code == SI_TIMER) {
|
||||
auto flag_id = kInvalidFlagId;
|
||||
std::memcpy(&flag_id, &si.si_value.sival_ptr, sizeof(flag_id));
|
||||
MarkDone(flag_id);
|
||||
} else if (si.si_code == SI_TKILL) {
|
||||
pthread_exit(nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
AsyncTimer::AsyncTimer() : flag_id_{kInvalidFlagId} {};
|
||||
|
||||
AsyncTimer::AsyncTimer(double seconds)
|
||||
: expiration_flag_{std::make_shared<std::atomic<bool>>(false)}, flag_id_{kInvalidFlagId}, timer_id_{} {
|
||||
MG_ASSERT(seconds <= max_seconds_as_double,
|
||||
"The AsyncTimer cannot handle larger time values than {:f}, the specified value: {:f}",
|
||||
max_seconds_as_double, seconds);
|
||||
MG_ASSERT(seconds >= 0.0, "The AsyncTimer cannot handle negative time values: {:f}", seconds);
|
||||
|
||||
static pthread_t background_timer_thread;
|
||||
static ThreadInfo thread_info;
|
||||
static std::once_flag timer_thread_setup_flag;
|
||||
|
||||
std::call_once(timer_thread_setup_flag, [] {
|
||||
pthread_create(&background_timer_thread, nullptr, TimerBackgroundWorker, &thread_info);
|
||||
while (!thread_info.setup_done.load(std::memory_order_acquire))
|
||||
;
|
||||
});
|
||||
|
||||
flag_id_ = AddFlag(std::weak_ptr<std::atomic<bool>>{expiration_flag_});
|
||||
|
||||
sigevent notification_settings{};
|
||||
notification_settings.sigev_notify = SIGEV_THREAD_ID;
|
||||
notification_settings.sigev_signo = SIGTIMER;
|
||||
notification_settings._sigev_un._tid = thread_info.thread_id;
|
||||
static_assert(sizeof(void *) == sizeof(flag_id_), "ID size must be equal to pointer size!");
|
||||
std::memcpy(¬ification_settings.sigev_value.sival_ptr, &flag_id_, sizeof(flag_id_));
|
||||
MG_ASSERT(timer_create(CLOCK_MONOTONIC, ¬ification_settings, &timer_id_) == 0, "Couldn't create timer: ({}) {}",
|
||||
errno, strerror(errno));
|
||||
|
||||
constexpr auto kSecondsToNanos = 1000 * 1000 * 1000;
|
||||
// Casting will truncate down, but that's exactly what we want.
|
||||
const auto second_as_time_t = static_cast<time_t>(seconds);
|
||||
const auto remaining_nano_seconds = static_cast<time_t>((seconds - second_as_time_t) * kSecondsToNanos);
|
||||
|
||||
struct itimerspec spec;
|
||||
spec.it_interval.tv_sec = 0;
|
||||
spec.it_interval.tv_nsec = 0;
|
||||
spec.it_value.tv_sec = second_as_time_t;
|
||||
spec.it_value.tv_nsec = remaining_nano_seconds;
|
||||
|
||||
MG_ASSERT(timer_settime(timer_id_, 0, &spec, nullptr) == 0, "Couldn't set timer: ({}) {}", errno, strerror(errno));
|
||||
}
|
||||
|
||||
AsyncTimer::~AsyncTimer() { ReleaseResources(); }
|
||||
|
||||
AsyncTimer::AsyncTimer(AsyncTimer &&other) noexcept
|
||||
: expiration_flag_{std::move(other.expiration_flag_)}, flag_id_{other.flag_id_}, timer_id_{other.timer_id_} {
|
||||
other.flag_id_ = kInvalidFlagId;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE (hicpp-noexcept-move)
|
||||
AsyncTimer &AsyncTimer::operator=(AsyncTimer &&other) {
|
||||
if (this == &other) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
ReleaseResources();
|
||||
|
||||
expiration_flag_ = std::move(other.expiration_flag_);
|
||||
flag_id_ = std::exchange(other.flag_id_, kInvalidFlagId);
|
||||
timer_id_ = other.timer_id_;
|
||||
|
||||
return *this;
|
||||
};
|
||||
|
||||
bool AsyncTimer::IsExpired() const {
|
||||
if (expiration_flag_ != nullptr) {
|
||||
return expiration_flag_->load(std::memory_order_relaxed);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void AsyncTimer::ReleaseResources() {
|
||||
if (expiration_flag_ != nullptr) {
|
||||
timer_delete(timer_id_);
|
||||
EraseFlag(flag_id_);
|
||||
flag_id_ = kInvalidFlagId;
|
||||
expiration_flag_ = std::shared_ptr<std::atomic<bool>>{};
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace utils
|
37
src/utils/async_timer.hpp
Normal file
37
src/utils/async_timer.hpp
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
#include <time.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
namespace utils {
|
||||
|
||||
#define SIGTIMER (SIGRTMAX - 2)
|
||||
|
||||
class AsyncTimer {
|
||||
public:
|
||||
AsyncTimer();
|
||||
explicit AsyncTimer(double seconds);
|
||||
~AsyncTimer();
|
||||
AsyncTimer(AsyncTimer &&other) noexcept;
|
||||
// NOLINTNEXTLINE (hicpp-noexcept-move)
|
||||
AsyncTimer &operator=(AsyncTimer &&other);
|
||||
|
||||
AsyncTimer(const AsyncTimer &) = delete;
|
||||
AsyncTimer &operator=(const AsyncTimer &) = delete;
|
||||
|
||||
// Returns false if the object isn't associated with any timer.
|
||||
bool IsExpired() const;
|
||||
|
||||
private:
|
||||
void ReleaseResources();
|
||||
|
||||
// If the expiration_flag_ is nullptr, then the object is not associated with any timer, therefore no clean up
|
||||
// is necessary. Furthermore, the the POSIX API doesn't specify any value as "invalid" for timer_t, so the timer_id_
|
||||
// cannot be used to determine whether the object is associated with any timer or not.
|
||||
std::shared_ptr<std::atomic<bool>> expiration_flag_;
|
||||
uint64_t flag_id_;
|
||||
timer_t timer_id_;
|
||||
};
|
||||
} // namespace utils
|
@ -666,7 +666,7 @@ class SkipList final {
|
||||
/// @return Iterator to the item in the list, will be equal to `end()` when
|
||||
/// the key isn't found
|
||||
template <typename TKey>
|
||||
Iterator find(const TKey &key) {
|
||||
Iterator find(const TKey &key) const {
|
||||
return skiplist_->template find(key);
|
||||
}
|
||||
|
||||
@ -676,7 +676,7 @@ class SkipList final {
|
||||
/// @return Iterator to the item in the list, will be equal to `end()` when
|
||||
/// no items match the search
|
||||
template <typename TKey>
|
||||
Iterator find_equal_or_greater(const TKey &key) {
|
||||
Iterator find_equal_or_greater(const TKey &key) const {
|
||||
return skiplist_->template find_equal_or_greater(key);
|
||||
}
|
||||
|
||||
|
@ -238,6 +238,9 @@ target_link_libraries(${test_prefix}utils_thread_pool mg-utils fmt)
|
||||
add_unit_test(utils_csv_parsing.cpp ${CMAKE_SOURCE_DIR}/src/utils/csv_parsing.cpp)
|
||||
target_link_libraries(${test_prefix}utils_csv_parsing mg-utils fmt)
|
||||
|
||||
add_unit_test(utils_async_timer.cpp)
|
||||
target_link_libraries(${test_prefix}utils_async_timer mg-utils)
|
||||
|
||||
# Test mg-storage-v2
|
||||
|
||||
add_unit_test(commit_log_v2.cpp)
|
||||
|
@ -19,7 +19,7 @@ TEST(QueryProfileTest, SimpleQuery) {
|
||||
// | * Once | 2 | 25.000000 % | 0.250000 ms |
|
||||
// +---------------+---------------+---------------+---------------+
|
||||
// clang-format: on
|
||||
auto table = ProfilingStatsToTable(produce, total_time);
|
||||
auto table = ProfilingStatsToTable(ProfilingStatsWithTotalTime{produce, total_time});
|
||||
|
||||
EXPECT_EQ(table[0][0].ValueString(), "* Produce");
|
||||
EXPECT_EQ(table[0][1].ValueInt(), 2);
|
||||
@ -48,7 +48,7 @@ TEST(QueryProfileTest, SimpleQuery) {
|
||||
// "relative_time": 0.75
|
||||
// }
|
||||
// clang-format: on
|
||||
auto json = ProfilingStatsToJson(produce, total_time);
|
||||
auto json = ProfilingStatsToJson(ProfilingStatsWithTotalTime{produce, total_time});
|
||||
|
||||
/*
|
||||
* NOTE: When one of these comparions fails and Google Test tries to report
|
||||
@ -94,7 +94,7 @@ TEST(QueryProfileTest, ComplicatedQuery) {
|
||||
// | * Once (1) | 2 | 5.000000 % | 0.050000 ms |
|
||||
// +----------------+----------------+----------------+----------------+
|
||||
// clang-format: on
|
||||
auto table = ProfilingStatsToTable(produce, total_time);
|
||||
auto table = ProfilingStatsToTable({produce, total_time});
|
||||
|
||||
EXPECT_EQ(table[0][0].ValueString(), "* Produce");
|
||||
EXPECT_EQ(table[0][1].ValueInt(), 2);
|
||||
@ -209,7 +209,7 @@ TEST(QueryProfileTest, ComplicatedQuery) {
|
||||
// "relative_time": 0.1,
|
||||
// }
|
||||
// clang-format: on
|
||||
auto json = ProfilingStatsToJson(produce, total_time);
|
||||
auto json = ProfilingStatsToJson(ProfilingStatsWithTotalTime{produce, total_time});
|
||||
|
||||
EXPECT_EQ(json["actual_hits"], 2);
|
||||
EXPECT_EQ(json["relative_time"], 0.1);
|
||||
|
138
tests/unit/utils_async_timer.cpp
Normal file
138
tests/unit/utils_async_timer.cpp
Normal file
@ -0,0 +1,138 @@
|
||||
#include <chrono>
|
||||
#include <cmath>
|
||||
#include <limits>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "utils/async_timer.hpp"
|
||||
|
||||
using AsyncTimer = utils::AsyncTimer;
|
||||
|
||||
constexpr auto kSecondsInMilis = 1000.0;
|
||||
constexpr auto kIntervalInSeconds = 0.3;
|
||||
constexpr auto kIntervalInMilis = kIntervalInSeconds * kSecondsInMilis;
|
||||
constexpr auto kAbsoluteErrorInMilis = 50;
|
||||
|
||||
std::chrono::steady_clock::time_point Now() { return std::chrono::steady_clock::now(); }
|
||||
|
||||
int ElapsedMilis(const std::chrono::steady_clock::time_point &start, const std::chrono::steady_clock::time_point &end) {
|
||||
return std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
|
||||
}
|
||||
|
||||
void CheckTimeSimple() {
|
||||
const auto before = Now();
|
||||
AsyncTimer timer{kIntervalInSeconds};
|
||||
while (!timer.IsExpired()) {
|
||||
ASSERT_LT(ElapsedMilis(before, Now()), 2 * kIntervalInMilis);
|
||||
}
|
||||
|
||||
const auto after = Now();
|
||||
|
||||
EXPECT_NEAR(ElapsedMilis(before, after), kIntervalInMilis, kAbsoluteErrorInMilis);
|
||||
}
|
||||
|
||||
TEST(AsyncTimer, SimpleWait) { CheckTimeSimple(); }
|
||||
|
||||
TEST(AsyncTimer, DoubleWait) {
|
||||
CheckTimeSimple();
|
||||
CheckTimeSimple();
|
||||
}
|
||||
|
||||
TEST(AsyncTimer, MoveConstruct) {
|
||||
const auto before = Now();
|
||||
AsyncTimer timer_1{kIntervalInSeconds};
|
||||
AsyncTimer timer_2{std::move(timer_1)};
|
||||
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_FALSE(timer_2.IsExpired());
|
||||
const auto first_check_point = Now();
|
||||
|
||||
while (!timer_2.IsExpired()) {
|
||||
ASSERT_LT(ElapsedMilis(before, Now()), 2 * kIntervalInMilis);
|
||||
}
|
||||
const auto second_check_point = Now();
|
||||
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_TRUE(timer_2.IsExpired());
|
||||
|
||||
EXPECT_LT(ElapsedMilis(before, first_check_point), kIntervalInMilis / 2);
|
||||
EXPECT_NEAR(ElapsedMilis(before, second_check_point), kIntervalInMilis, kAbsoluteErrorInMilis);
|
||||
}
|
||||
|
||||
TEST(AsyncTimer, MoveAssign) {
|
||||
const auto before = Now();
|
||||
AsyncTimer timer_1{2 * kIntervalInSeconds};
|
||||
AsyncTimer timer_2{kIntervalInSeconds};
|
||||
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_FALSE(timer_2.IsExpired());
|
||||
const auto first_check_point = Now();
|
||||
|
||||
timer_2 = std::move(timer_1);
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_FALSE(timer_2.IsExpired());
|
||||
|
||||
while (!timer_2.IsExpired()) {
|
||||
ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis);
|
||||
}
|
||||
const auto second_check_point = Now();
|
||||
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_TRUE(timer_2.IsExpired());
|
||||
|
||||
EXPECT_LT(ElapsedMilis(before, first_check_point), kIntervalInMilis / 2);
|
||||
EXPECT_NEAR(ElapsedMilis(before, second_check_point), 2 * kIntervalInMilis, kAbsoluteErrorInMilis);
|
||||
}
|
||||
|
||||
TEST(AsyncTimer, AssignToExpiredTimer) {
|
||||
const auto before = Now();
|
||||
AsyncTimer timer_1{2 * kIntervalInSeconds};
|
||||
AsyncTimer timer_2{kIntervalInSeconds};
|
||||
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_FALSE(timer_2.IsExpired());
|
||||
const auto first_check_point = Now();
|
||||
|
||||
while (!timer_2.IsExpired()) {
|
||||
ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis);
|
||||
}
|
||||
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_TRUE(timer_2.IsExpired());
|
||||
const auto second_check_point = Now();
|
||||
|
||||
timer_2 = std::move(timer_1);
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_FALSE(timer_2.IsExpired());
|
||||
const auto third_check_point = Now();
|
||||
|
||||
while (!timer_2.IsExpired()) {
|
||||
ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis);
|
||||
}
|
||||
|
||||
EXPECT_FALSE(timer_1.IsExpired());
|
||||
EXPECT_TRUE(timer_2.IsExpired());
|
||||
const auto fourth_check_point = Now();
|
||||
|
||||
EXPECT_LT(ElapsedMilis(before, first_check_point), kIntervalInMilis / 2);
|
||||
EXPECT_NEAR(ElapsedMilis(before, second_check_point), kIntervalInMilis, kAbsoluteErrorInMilis);
|
||||
EXPECT_LT(ElapsedMilis(before, third_check_point), 1.5 * kIntervalInMilis);
|
||||
EXPECT_NEAR(ElapsedMilis(before, fourth_check_point), 2 * kIntervalInMilis, kAbsoluteErrorInMilis);
|
||||
}
|
||||
|
||||
TEST(AsyncTimer, DestroyTimerWhileItIsStillRunning) {
|
||||
{ AsyncTimer timer_to_destroy{kIntervalInSeconds}; }
|
||||
const auto before = Now();
|
||||
AsyncTimer timer_to_wait{1.5 * kIntervalInSeconds};
|
||||
while (!timer_to_wait.IsExpired()) {
|
||||
ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis);
|
||||
}
|
||||
// At this point the timer_to_destroy has expired, nothing bad happened. This doesn't mean the timer cancellation
|
||||
// works properly, it just means that nothing bad happens if a timer get cancelled.
|
||||
}
|
||||
|
||||
TEST(AsyncTimer, TimersWithExtremeValues) {
|
||||
AsyncTimer timer_with_zero{0};
|
||||
const double expected_maximum_value = std::nexttoward(std::numeric_limits<time_t>::max(), 0.0);
|
||||
AsyncTimer timer_with_max_value{expected_maximum_value};
|
||||
}
|
Loading…
Reference in New Issue
Block a user