diff --git a/src/query/context.hpp b/src/query/context.hpp index 2bced7a1a..afdd83f5d 100644 --- a/src/query/context.hpp +++ b/src/query/context.hpp @@ -1,11 +1,13 @@ #pragma once +#include <type_traits> + #include "query/common.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/parameters.hpp" #include "query/plan/profile.hpp" #include "query/trigger.hpp" -#include "utils/tsc.hpp" +#include "utils/async_timer.hpp" namespace query { @@ -50,20 +52,25 @@ struct ExecutionContext { DbAccessor *db_accessor{nullptr}; SymbolTable symbol_table; EvaluationContext evaluation_context; - utils::TSCTimer execution_tsc_timer; - double max_execution_time_sec{0.0}; std::atomic<bool> *is_shutting_down{nullptr}; bool is_profile_query{false}; std::chrono::duration<double> profile_execution_time; plan::ProfilingStats stats; plan::ProfilingStats *stats_root{nullptr}; TriggerContextCollector *trigger_context_collector{nullptr}; + utils::AsyncTimer timer; }; +static_assert(std::is_move_assignable_v<ExecutionContext>, "ExecutionContext must be move assignable!"); +static_assert(std::is_move_constructible_v<ExecutionContext>, "ExecutionContext must be move constructible!"); + inline bool MustAbort(const ExecutionContext &context) { - return (context.is_shutting_down && context.is_shutting_down->load(std::memory_order_acquire)) || - (context.max_execution_time_sec > 0 && - context.execution_tsc_timer.Elapsed() >= context.max_execution_time_sec); + return (context.is_shutting_down != nullptr && context.is_shutting_down->load(std::memory_order_acquire)) || + context.timer.IsExpired(); +} + +inline plan::ProfilingStatsWithTotalTime GetStatsWithTotalTime(const ExecutionContext &context) { + return plan::ProfilingStatsWithTotalTime{context.stats, context.profile_execution_time}; } } // namespace query diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 69bc4e3d6..de3a341b5 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -473,9 +473,9 @@ struct PullPlan { DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, TriggerContextCollector *trigger_context_collector = nullptr, std::optional<size_t> memory_limit = {}); - std::optional<ExecutionContext> Pull(AnyStream *stream, std::optional<int> n, - const std::vector<Symbol> &output_symbols, - std::map<std::string, TypedValue> *summary); + std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n, + const std::vector<Symbol> &output_symbols, + std::map<std::string, TypedValue> *summary); private: std::shared_ptr<CachedPlan> plan_ = nullptr; @@ -513,16 +513,17 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par ctx_.evaluation_context.parameters = parameters; ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba); ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba); - ctx_.execution_tsc_timer = utils::TSCTimer(interpreter_context->tsc_frequency); - ctx_.max_execution_time_sec = interpreter_context->execution_timeout_sec; + if (interpreter_context->execution_timeout_sec > 0) { + ctx_.timer = utils::AsyncTimer{interpreter_context->execution_timeout_sec}; + } ctx_.is_shutting_down = &interpreter_context->is_shutting_down; ctx_.is_profile_query = is_profile_query; ctx_.trigger_context_collector = trigger_context_collector; } -std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<int> n, - const std::vector<Symbol> &output_symbols, - std::map<std::string, TypedValue> *summary) { +std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n, + const std::vector<Symbol> &output_symbols, + std::map<std::string, TypedValue> *summary) { // Set up temporary memory for a single Pull. Initial memory comes from the // stack. 256 KiB should fit on the stack and should be more than enough for a // single `Pull`. @@ -595,7 +596,7 @@ std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional< summary->insert_or_assign("plan_execution_time", execution_time_.count()); cursor_->Shutdown(); ctx_.profile_execution_time = execution_time_; - return std::move(ctx_); + return GetStatsWithTotalTime(ctx_); } using RWType = plan::ReadWriteTypeChecker::RWType; @@ -828,32 +829,33 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra auto rw_type_checker = plan::ReadWriteTypeChecker(); rw_type_checker.InferRWType(const_cast<plan::LogicalOperator &>(cypher_query_plan->plan())); - return PreparedQuery{ - {"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"}, - std::move(parsed_query.required_privileges), - [plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), summary, dba, - interpreter_context, execution_memory, memory_limit, - // We want to execute the query we are profiling lazily, so we delay - // the construction of the corresponding context. - ctx = std::optional<ExecutionContext>{}, pull_plan = std::shared_ptr<PullPlanVector>(nullptr)]( - AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> { - // No output symbols are given so that nothing is streamed. - if (!ctx) { - ctx = PullPlan(plan, parameters, true, dba, interpreter_context, execution_memory, nullptr, memory_limit) - .Pull(stream, {}, {}, summary); - pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(ctx->stats, ctx->profile_execution_time)); - } + return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"}, + std::move(parsed_query.required_privileges), + [plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), + summary, dba, interpreter_context, execution_memory, memory_limit, + // We want to execute the query we are profiling lazily, so we delay + // the construction of the corresponding context. + stats_and_total_time = std::optional<plan::ProfilingStatsWithTotalTime>{}, + pull_plan = std::shared_ptr<PullPlanVector>(nullptr)]( + AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> { + // No output symbols are given so that nothing is streamed. + if (!stats_and_total_time) { + stats_and_total_time = PullPlan(plan, parameters, true, dba, interpreter_context, + execution_memory, nullptr, memory_limit) + .Pull(stream, {}, {}, summary); + pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(*stats_and_total_time)); + } - MG_ASSERT(ctx, "Failed to execute the query!"); + MG_ASSERT(stats_and_total_time, "Failed to execute the query!"); - if (pull_plan->Pull(stream, n)) { - summary->insert_or_assign("profile", ProfilingStatsToJson(ctx->stats, ctx->profile_execution_time).dump()); - return QueryHandlerResult::ABORT; - } + if (pull_plan->Pull(stream, n)) { + summary->insert_or_assign("profile", ProfilingStatsToJson(*stats_and_total_time).dump()); + return QueryHandlerResult::ABORT; + } - return std::nullopt; - }, - rw_type_checker.type}; + return std::nullopt; + }, + rw_type_checker.type}; } PreparedQuery PrepareDumpQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary, DbAccessor *dba, @@ -1602,9 +1604,8 @@ void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, Interpret trigger_context.AdaptForAccessor(&db_accessor); try { - trigger.Execute(&db_accessor, &execution_memory, *interpreter_context->tsc_frequency, - interpreter_context->execution_timeout_sec, &interpreter_context->is_shutting_down, - trigger_context); + trigger.Execute(&db_accessor, &execution_memory, interpreter_context->execution_timeout_sec, + &interpreter_context->is_shutting_down, trigger_context); } catch (const utils::BasicException &exception) { spdlog::warn("Trigger '{}' failed with exception:\n{}", trigger.Name(), exception.what()); db_accessor.Abort(); @@ -1658,9 +1659,8 @@ void Interpreter::Commit() { utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; AdvanceCommand(); try { - trigger.Execute(&*execution_db_accessor_, &execution_memory, *interpreter_context_->tsc_frequency, - interpreter_context_->execution_timeout_sec, &interpreter_context_->is_shutting_down, - *trigger_context); + trigger.Execute(&*execution_db_accessor_, &execution_memory, interpreter_context_->execution_timeout_sec, + &interpreter_context_->is_shutting_down, *trigger_context); } catch (const utils::BasicException &e) { throw utils::BasicException( fmt::format("Trigger '{}' caused the transaction to fail.\nException: {}", trigger.Name(), e.what())); diff --git a/src/query/plan/profile.cpp b/src/query/plan/profile.cpp index f8afeb03c..bafba17d1 100644 --- a/src/query/plan/profile.cpp +++ b/src/query/plan/profile.cpp @@ -98,10 +98,9 @@ class ProfilingStatsToTableHelper { } // namespace -std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStats &cumulative_stats, - std::chrono::duration<double> total_time) { - ProfilingStatsToTableHelper helper{cumulative_stats.num_cycles, total_time}; - helper.Output(cumulative_stats); +std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStatsWithTotalTime &stats) { + ProfilingStatsToTableHelper helper{stats.cumulative_stats.num_cycles, stats.total_time}; + helper.Output(stats.cumulative_stats); return helper.rows(); } @@ -147,9 +146,9 @@ class ProfilingStatsToJsonHelper { } // namespace -nlohmann::json ProfilingStatsToJson(const ProfilingStats &cumulative_stats, std::chrono::duration<double> total_time) { - ProfilingStatsToJsonHelper helper{cumulative_stats.num_cycles, total_time}; - helper.Output(cumulative_stats); +nlohmann::json ProfilingStatsToJson(const ProfilingStatsWithTotalTime &stats) { + ProfilingStatsToJsonHelper helper{stats.cumulative_stats.num_cycles, stats.total_time}; + helper.Output(stats.cumulative_stats); return helper.ToJson(); } diff --git a/src/query/plan/profile.hpp b/src/query/plan/profile.hpp index bea2536a5..ed88f29e9 100644 --- a/src/query/plan/profile.hpp +++ b/src/query/plan/profile.hpp @@ -23,10 +23,14 @@ struct ProfilingStats { std::vector<ProfilingStats> children; }; -std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStats &cumulative_stats, - std::chrono::duration<double>); +struct ProfilingStatsWithTotalTime { + ProfilingStats cumulative_stats{}; + std::chrono::duration<double> total_time{}; +}; -nlohmann::json ProfilingStatsToJson(const ProfilingStats &cumulative_stats, std::chrono::duration<double>); +std::vector<std::vector<TypedValue>> ProfilingStatsToTable(const ProfilingStatsWithTotalTime &stats); + +nlohmann::json ProfilingStatsToJson(const ProfilingStatsWithTotalTime &stats); } // namespace plan } // namespace query diff --git a/src/query/trigger.cpp b/src/query/trigger.cpp index 401e43d50..89fb90681 100644 --- a/src/query/trigger.cpp +++ b/src/query/trigger.cpp @@ -172,7 +172,7 @@ std::shared_ptr<Trigger::TriggerPlan> Trigger::GetPlan(DbAccessor *db_accessor) return trigger_plan_; } -void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, const double tsc_frequency, +void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, const double max_execution_time_sec, std::atomic<bool> *is_shutting_down, const TriggerContext &context) const { if (!context.ShouldEventTrigger(event_type_)) { @@ -193,8 +193,7 @@ void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution ctx.evaluation_context.parameters = parsed_statements_.parameters; ctx.evaluation_context.properties = NamesToProperties(plan.ast_storage().properties_, dba); ctx.evaluation_context.labels = NamesToLabels(plan.ast_storage().labels_, dba); - ctx.execution_tsc_timer = utils::TSCTimer(tsc_frequency); - ctx.max_execution_time_sec = max_execution_time_sec; + ctx.timer = utils::AsyncTimer(max_execution_time_sec); ctx.is_shutting_down = is_shutting_down; ctx.is_profile_query = false; diff --git a/src/query/trigger.hpp b/src/query/trigger.hpp index 1eb22bd08..38e9005ef 100644 --- a/src/query/trigger.hpp +++ b/src/query/trigger.hpp @@ -23,8 +23,8 @@ struct Trigger { const std::map<std::string, storage::PropertyValue> &user_parameters, TriggerEventType event_type, utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, utils::SpinLock *antlr_lock); - void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double tsc_frequency, - double max_execution_time_sec, std::atomic<bool> *is_shutting_down, const TriggerContext &context) const; + void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double max_execution_time_sec, + std::atomic<bool> *is_shutting_down, const TriggerContext &context) const; bool operator==(const Trigger &other) const { return name_ == other.name_; } // NOLINTNEXTLINE (modernize-use-nullptr) diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index b63d8e530..bd806e870 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -1,4 +1,5 @@ set(utils_src_files + async_timer.cpp event_counter.cpp csv_parsing.cpp file.cpp @@ -13,7 +14,7 @@ set(utils_src_files uuid.cpp) add_library(mg-utils STATIC ${utils_src_files}) -target_link_libraries(mg-utils stdc++fs Threads::Threads spdlog fmt gflags uuid) +target_link_libraries(mg-utils stdc++fs Threads::Threads spdlog fmt gflags uuid rt) add_library(mg-new-delete STATIC new_delete.cpp) target_link_libraries(mg-new-delete jemalloc fmt) diff --git a/src/utils/async_timer.cpp b/src/utils/async_timer.cpp new file mode 100644 index 000000000..afe90ce7f --- /dev/null +++ b/src/utils/async_timer.cpp @@ -0,0 +1,187 @@ +#include "utils/async_timer.hpp" + +#include <csignal> + +#include <algorithm> +#include <atomic> +#include <cmath> +#include <cstdint> +#include <limits> + +#include "utils/skip_list.hpp" +#include "utils/spin_lock.hpp" +#include "utils/synchronized.hpp" + +namespace { + +constexpr uint64_t kInvalidFlagId = 0U; +// std::numeric_limits<time_t>::max() cannot be represented precisely as a double, so the next smallest value is the +// maximum number of seconds the timer can be used with +const double max_seconds_as_double = std::nexttoward(std::numeric_limits<time_t>::max(), 0.0); + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +std::atomic<uint64_t> expiration_flag_counter{kInvalidFlagId + 1U}; + +struct ExpirationFlagInfo { + uint64_t id{0U}; + std::weak_ptr<std::atomic<bool>> flag{}; +}; + +bool operator==(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id == rhs.id; } +bool operator<(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id < rhs.id; } +bool operator==(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id == id; } +bool operator<(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id < id; } + +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +utils::SkipList<ExpirationFlagInfo> expiration_flags{}; + +uint64_t AddFlag(std::weak_ptr<std::atomic<bool>> flag) { + const auto id = expiration_flag_counter.fetch_add(1, std::memory_order_relaxed); + expiration_flags.access().insert({id, std::move(flag)}); + return id; +} + +void EraseFlag(uint64_t flag_id) { expiration_flags.access().remove(flag_id); } + +std::weak_ptr<std::atomic<bool>> GetFlag(uint64_t flag_id) { + const auto flag_accessor = expiration_flags.access(); + const auto it = flag_accessor.find(flag_id); + if (it == flag_accessor.end()) { + return {}; + } + + return it->flag; +} + +void MarkDone(const uint64_t flag_id) { + const auto weak_flag = GetFlag(flag_id); + if (weak_flag.expired()) { + return; + } + auto flag = weak_flag.lock(); + if (flag != nullptr) { + flag->store(true, std::memory_order_relaxed); + } +} +} // namespace + +namespace utils { + +namespace { +struct ThreadInfo { + pid_t thread_id; + std::atomic<bool> setup_done{false}; +}; + +void *TimerBackgroundWorker(void *args) { + auto *thread_info = static_cast<ThreadInfo *>(args); + thread_info->thread_id = syscall(SYS_gettid); + thread_info->setup_done.store(true, std::memory_order_release); + + sigset_t ss; + sigemptyset(&ss); + sigaddset(&ss, SIGTIMER); + sigprocmask(SIG_BLOCK, &ss, nullptr); + + while (true) { + siginfo_t si; + int result = sigwaitinfo(&ss, &si); + + if (result <= 0) { + continue; + } + + if (si.si_code == SI_TIMER) { + auto flag_id = kInvalidFlagId; + std::memcpy(&flag_id, &si.si_value.sival_ptr, sizeof(flag_id)); + MarkDone(flag_id); + } else if (si.si_code == SI_TKILL) { + pthread_exit(nullptr); + } + } +} +} // namespace + +AsyncTimer::AsyncTimer() : flag_id_{kInvalidFlagId} {}; + +AsyncTimer::AsyncTimer(double seconds) + : expiration_flag_{std::make_shared<std::atomic<bool>>(false)}, flag_id_{kInvalidFlagId}, timer_id_{} { + MG_ASSERT(seconds <= max_seconds_as_double, + "The AsyncTimer cannot handle larger time values than {:f}, the specified value: {:f}", + max_seconds_as_double, seconds); + MG_ASSERT(seconds >= 0.0, "The AsyncTimer cannot handle negative time values: {:f}", seconds); + + static pthread_t background_timer_thread; + static ThreadInfo thread_info; + static std::once_flag timer_thread_setup_flag; + + std::call_once(timer_thread_setup_flag, [] { + pthread_create(&background_timer_thread, nullptr, TimerBackgroundWorker, &thread_info); + while (!thread_info.setup_done.load(std::memory_order_acquire)) + ; + }); + + flag_id_ = AddFlag(std::weak_ptr<std::atomic<bool>>{expiration_flag_}); + + sigevent notification_settings{}; + notification_settings.sigev_notify = SIGEV_THREAD_ID; + notification_settings.sigev_signo = SIGTIMER; + notification_settings._sigev_un._tid = thread_info.thread_id; + static_assert(sizeof(void *) == sizeof(flag_id_), "ID size must be equal to pointer size!"); + std::memcpy(¬ification_settings.sigev_value.sival_ptr, &flag_id_, sizeof(flag_id_)); + MG_ASSERT(timer_create(CLOCK_MONOTONIC, ¬ification_settings, &timer_id_) == 0, "Couldn't create timer: ({}) {}", + errno, strerror(errno)); + + constexpr auto kSecondsToNanos = 1000 * 1000 * 1000; + // Casting will truncate down, but that's exactly what we want. + const auto second_as_time_t = static_cast<time_t>(seconds); + const auto remaining_nano_seconds = static_cast<time_t>((seconds - second_as_time_t) * kSecondsToNanos); + + struct itimerspec spec; + spec.it_interval.tv_sec = 0; + spec.it_interval.tv_nsec = 0; + spec.it_value.tv_sec = second_as_time_t; + spec.it_value.tv_nsec = remaining_nano_seconds; + + MG_ASSERT(timer_settime(timer_id_, 0, &spec, nullptr) == 0, "Couldn't set timer: ({}) {}", errno, strerror(errno)); +} + +AsyncTimer::~AsyncTimer() { ReleaseResources(); } + +AsyncTimer::AsyncTimer(AsyncTimer &&other) noexcept + : expiration_flag_{std::move(other.expiration_flag_)}, flag_id_{other.flag_id_}, timer_id_{other.timer_id_} { + other.flag_id_ = kInvalidFlagId; +} + +// NOLINTNEXTLINE (hicpp-noexcept-move) +AsyncTimer &AsyncTimer::operator=(AsyncTimer &&other) { + if (this == &other) { + return *this; + } + + ReleaseResources(); + + expiration_flag_ = std::move(other.expiration_flag_); + flag_id_ = std::exchange(other.flag_id_, kInvalidFlagId); + timer_id_ = other.timer_id_; + + return *this; +}; + +bool AsyncTimer::IsExpired() const { + if (expiration_flag_ != nullptr) { + return expiration_flag_->load(std::memory_order_relaxed); + } + return false; +} + +void AsyncTimer::ReleaseResources() { + if (expiration_flag_ != nullptr) { + timer_delete(timer_id_); + EraseFlag(flag_id_); + flag_id_ = kInvalidFlagId; + expiration_flag_ = std::shared_ptr<std::atomic<bool>>{}; + } +} + +} // namespace utils diff --git a/src/utils/async_timer.hpp b/src/utils/async_timer.hpp new file mode 100644 index 000000000..4ac2ffc87 --- /dev/null +++ b/src/utils/async_timer.hpp @@ -0,0 +1,37 @@ +#pragma once +#include <time.h> + +#include <memory> + +#include "utils/logging.hpp" + +namespace utils { + +#define SIGTIMER (SIGRTMAX - 2) + +class AsyncTimer { + public: + AsyncTimer(); + explicit AsyncTimer(double seconds); + ~AsyncTimer(); + AsyncTimer(AsyncTimer &&other) noexcept; + // NOLINTNEXTLINE (hicpp-noexcept-move) + AsyncTimer &operator=(AsyncTimer &&other); + + AsyncTimer(const AsyncTimer &) = delete; + AsyncTimer &operator=(const AsyncTimer &) = delete; + + // Returns false if the object isn't associated with any timer. + bool IsExpired() const; + + private: + void ReleaseResources(); + + // If the expiration_flag_ is nullptr, then the object is not associated with any timer, therefore no clean up + // is necessary. Furthermore, the the POSIX API doesn't specify any value as "invalid" for timer_t, so the timer_id_ + // cannot be used to determine whether the object is associated with any timer or not. + std::shared_ptr<std::atomic<bool>> expiration_flag_; + uint64_t flag_id_; + timer_t timer_id_; +}; +} // namespace utils diff --git a/src/utils/skip_list.hpp b/src/utils/skip_list.hpp index fd2295fea..b5ee81544 100644 --- a/src/utils/skip_list.hpp +++ b/src/utils/skip_list.hpp @@ -666,7 +666,7 @@ class SkipList final { /// @return Iterator to the item in the list, will be equal to `end()` when /// the key isn't found template <typename TKey> - Iterator find(const TKey &key) { + Iterator find(const TKey &key) const { return skiplist_->template find(key); } @@ -676,7 +676,7 @@ class SkipList final { /// @return Iterator to the item in the list, will be equal to `end()` when /// no items match the search template <typename TKey> - Iterator find_equal_or_greater(const TKey &key) { + Iterator find_equal_or_greater(const TKey &key) const { return skiplist_->template find_equal_or_greater(key); } diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 5f61495a4..7a2c0e5e7 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -238,6 +238,9 @@ target_link_libraries(${test_prefix}utils_thread_pool mg-utils fmt) add_unit_test(utils_csv_parsing.cpp ${CMAKE_SOURCE_DIR}/src/utils/csv_parsing.cpp) target_link_libraries(${test_prefix}utils_csv_parsing mg-utils fmt) +add_unit_test(utils_async_timer.cpp) +target_link_libraries(${test_prefix}utils_async_timer mg-utils) + # Test mg-storage-v2 add_unit_test(commit_log_v2.cpp) diff --git a/tests/unit/query_profile.cpp b/tests/unit/query_profile.cpp index 6f80f3812..09d657201 100644 --- a/tests/unit/query_profile.cpp +++ b/tests/unit/query_profile.cpp @@ -19,7 +19,7 @@ TEST(QueryProfileTest, SimpleQuery) { // | * Once | 2 | 25.000000 % | 0.250000 ms | // +---------------+---------------+---------------+---------------+ // clang-format: on - auto table = ProfilingStatsToTable(produce, total_time); + auto table = ProfilingStatsToTable(ProfilingStatsWithTotalTime{produce, total_time}); EXPECT_EQ(table[0][0].ValueString(), "* Produce"); EXPECT_EQ(table[0][1].ValueInt(), 2); @@ -48,7 +48,7 @@ TEST(QueryProfileTest, SimpleQuery) { // "relative_time": 0.75 // } // clang-format: on - auto json = ProfilingStatsToJson(produce, total_time); + auto json = ProfilingStatsToJson(ProfilingStatsWithTotalTime{produce, total_time}); /* * NOTE: When one of these comparions fails and Google Test tries to report @@ -94,7 +94,7 @@ TEST(QueryProfileTest, ComplicatedQuery) { // | * Once (1) | 2 | 5.000000 % | 0.050000 ms | // +----------------+----------------+----------------+----------------+ // clang-format: on - auto table = ProfilingStatsToTable(produce, total_time); + auto table = ProfilingStatsToTable({produce, total_time}); EXPECT_EQ(table[0][0].ValueString(), "* Produce"); EXPECT_EQ(table[0][1].ValueInt(), 2); @@ -209,7 +209,7 @@ TEST(QueryProfileTest, ComplicatedQuery) { // "relative_time": 0.1, // } // clang-format: on - auto json = ProfilingStatsToJson(produce, total_time); + auto json = ProfilingStatsToJson(ProfilingStatsWithTotalTime{produce, total_time}); EXPECT_EQ(json["actual_hits"], 2); EXPECT_EQ(json["relative_time"], 0.1); diff --git a/tests/unit/utils_async_timer.cpp b/tests/unit/utils_async_timer.cpp new file mode 100644 index 000000000..65fa69dee --- /dev/null +++ b/tests/unit/utils_async_timer.cpp @@ -0,0 +1,138 @@ +#include <chrono> +#include <cmath> +#include <limits> + +#include "gtest/gtest.h" + +#include "utils/async_timer.hpp" + +using AsyncTimer = utils::AsyncTimer; + +constexpr auto kSecondsInMilis = 1000.0; +constexpr auto kIntervalInSeconds = 0.3; +constexpr auto kIntervalInMilis = kIntervalInSeconds * kSecondsInMilis; +constexpr auto kAbsoluteErrorInMilis = 50; + +std::chrono::steady_clock::time_point Now() { return std::chrono::steady_clock::now(); } + +int ElapsedMilis(const std::chrono::steady_clock::time_point &start, const std::chrono::steady_clock::time_point &end) { + return std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(); +} + +void CheckTimeSimple() { + const auto before = Now(); + AsyncTimer timer{kIntervalInSeconds}; + while (!timer.IsExpired()) { + ASSERT_LT(ElapsedMilis(before, Now()), 2 * kIntervalInMilis); + } + + const auto after = Now(); + + EXPECT_NEAR(ElapsedMilis(before, after), kIntervalInMilis, kAbsoluteErrorInMilis); +} + +TEST(AsyncTimer, SimpleWait) { CheckTimeSimple(); } + +TEST(AsyncTimer, DoubleWait) { + CheckTimeSimple(); + CheckTimeSimple(); +} + +TEST(AsyncTimer, MoveConstruct) { + const auto before = Now(); + AsyncTimer timer_1{kIntervalInSeconds}; + AsyncTimer timer_2{std::move(timer_1)}; + + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_FALSE(timer_2.IsExpired()); + const auto first_check_point = Now(); + + while (!timer_2.IsExpired()) { + ASSERT_LT(ElapsedMilis(before, Now()), 2 * kIntervalInMilis); + } + const auto second_check_point = Now(); + + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_TRUE(timer_2.IsExpired()); + + EXPECT_LT(ElapsedMilis(before, first_check_point), kIntervalInMilis / 2); + EXPECT_NEAR(ElapsedMilis(before, second_check_point), kIntervalInMilis, kAbsoluteErrorInMilis); +} + +TEST(AsyncTimer, MoveAssign) { + const auto before = Now(); + AsyncTimer timer_1{2 * kIntervalInSeconds}; + AsyncTimer timer_2{kIntervalInSeconds}; + + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_FALSE(timer_2.IsExpired()); + const auto first_check_point = Now(); + + timer_2 = std::move(timer_1); + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_FALSE(timer_2.IsExpired()); + + while (!timer_2.IsExpired()) { + ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis); + } + const auto second_check_point = Now(); + + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_TRUE(timer_2.IsExpired()); + + EXPECT_LT(ElapsedMilis(before, first_check_point), kIntervalInMilis / 2); + EXPECT_NEAR(ElapsedMilis(before, second_check_point), 2 * kIntervalInMilis, kAbsoluteErrorInMilis); +} + +TEST(AsyncTimer, AssignToExpiredTimer) { + const auto before = Now(); + AsyncTimer timer_1{2 * kIntervalInSeconds}; + AsyncTimer timer_2{kIntervalInSeconds}; + + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_FALSE(timer_2.IsExpired()); + const auto first_check_point = Now(); + + while (!timer_2.IsExpired()) { + ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis); + } + + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_TRUE(timer_2.IsExpired()); + const auto second_check_point = Now(); + + timer_2 = std::move(timer_1); + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_FALSE(timer_2.IsExpired()); + const auto third_check_point = Now(); + + while (!timer_2.IsExpired()) { + ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis); + } + + EXPECT_FALSE(timer_1.IsExpired()); + EXPECT_TRUE(timer_2.IsExpired()); + const auto fourth_check_point = Now(); + + EXPECT_LT(ElapsedMilis(before, first_check_point), kIntervalInMilis / 2); + EXPECT_NEAR(ElapsedMilis(before, second_check_point), kIntervalInMilis, kAbsoluteErrorInMilis); + EXPECT_LT(ElapsedMilis(before, third_check_point), 1.5 * kIntervalInMilis); + EXPECT_NEAR(ElapsedMilis(before, fourth_check_point), 2 * kIntervalInMilis, kAbsoluteErrorInMilis); +} + +TEST(AsyncTimer, DestroyTimerWhileItIsStillRunning) { + { AsyncTimer timer_to_destroy{kIntervalInSeconds}; } + const auto before = Now(); + AsyncTimer timer_to_wait{1.5 * kIntervalInSeconds}; + while (!timer_to_wait.IsExpired()) { + ASSERT_LT(ElapsedMilis(before, Now()), 3 * kIntervalInMilis); + } + // At this point the timer_to_destroy has expired, nothing bad happened. This doesn't mean the timer cancellation + // works properly, it just means that nothing bad happens if a timer get cancelled. +} + +TEST(AsyncTimer, TimersWithExtremeValues) { + AsyncTimer timer_with_zero{0}; + const double expected_maximum_value = std::nexttoward(std::numeric_limits<time_t>::max(), 0.0); + AsyncTimer timer_with_max_value{expected_maximum_value}; +}