diff --git a/tests/concurrent/measure_time.cpp b/poc/measure_time.cpp similarity index 98% rename from tests/concurrent/measure_time.cpp rename to poc/measure_time.cpp index f20d3f10d..0d68f65d0 100644 --- a/tests/concurrent/measure_time.cpp +++ b/poc/measure_time.cpp @@ -4,8 +4,8 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "timer.hpp" #include "utils/assert.hpp" -#include "utils/timer.hpp" using namespace std::chrono_literals; using namespace utils; diff --git a/poc/timer.hpp b/poc/timer.hpp new file mode 100644 index 000000000..07cd2b6aa --- /dev/null +++ b/poc/timer.hpp @@ -0,0 +1,157 @@ +#pragma once + +#include <atomic> +#include <chrono> +#include <memory> +#include <set> +#include <thread> + +#include <glog/logging.h> + +namespace utils { + +/** + * @class Timer + * + * @brief The timer contains counter and handler which is executed when the time + * exceedes. + * + * With every clock interval the counter should be decreased for + * delta count. Delta count is one for now but it should be a variable in the + * near future. The handler is function that will be called when counter + * becomes zero or smaller than zero. + */ +struct Timer { + using sptr = std::shared_ptr<Timer>; + using handler_t = std::function<void(void)>; + + Timer(int64_t counter, handler_t handler) + : counter(counter), handler(handler) {} + + bool operator--() { + if (--counter <= 0) + return true; + else + return false; + } + + int64_t counter; + handler_t handler; +}; + +/** + * Timer container knows how to add a new timer and remove the + * existing container from itself. Also, time container object + * has the process method whose responsibility is to iterate + * over existing timers and call the appropriate handler function. + * The handler method could be called on the same thread, on a + * separate thread or on a thread pool, that is implementation detail of + * the process method. + */ + +/** + * @class TimerSet + * + * @brief Trivial timer container implementation. + * + * Internal data stucture for storage of timers is std::set. So, the + * related timer complexities are: + * insertion: O(log(n)) + * deletion: O(log(n)) + * process: O(n) + */ +class TimerSet { + public: + void add(Timer::sptr timer) { timers.insert(timer); } + + void remove(Timer::sptr timer) { timers.erase(timer); } + + uint64_t size() const { return timers.size(); } + + void process() { + for (auto it = timers.begin(); it != timers.end();) { + auto timer = *it; + if (--*timer) { + timer->handler(); + it = timers.erase(it); + continue; + } + ++it; + } + } + + private: + std::set<std::shared_ptr<Timer>> timers; +}; + +/** + * @class TimerScheduler + * + * @brief TimerScheduler is a manager class and its responsibility is to + * take care of the time and call the timer_container process method in the + * appropriate time. + * + * @tparam timer_container_type implements a strategy how the timers + * are processed + * @tparam delta_time_type type of a time distance between two events + * @tparam delta_time granularity between the two events, default value is 1 + */ +template <typename timer_container_type, typename delta_time_type, + uint64_t delta_time = 1> +class TimerScheduler { + public: + /** + * Adds a timer. + * + * @param timer shared pointer to the timer object \ref Timer + */ + void add(Timer::sptr timer) { timer_container.add(timer); } + + /** + * Removes a timer. + * + * @param timer shared pointer to the timer object \ref Timer + */ + void remove(Timer::sptr timer) { timer_container.remove(timer); } + + /** + * Provides the number of pending timers. The exact number has to be + * provided by a timer_container. + * + * @return uint64_t the number of pending timers. + */ + uint64_t size() const { return timer_container.size(); } + + /** + * Runs a separate thread which responsibility is to run the process method + * at the appropriate time (every delta_time from the beginning of + * processing. + */ + void run() { + is_running.store(true); + + run_thread = std::thread([this]() { + while (is_running.load()) { + std::this_thread::sleep_for(delta_time_type(delta_time)); + timer_container.process(); + DLOG(INFO) << "timer_container.process()"; + } + }); + } + + /** + * Stops the whole processing. + */ + void stop() { is_running.store(false); } + + /** + * Joins the processing thread. + */ + ~TimerScheduler() { run_thread.join(); } + + private: + timer_container_type timer_container; + std::thread run_thread; + std::atomic<bool> is_running; +}; +} diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index bb16c5cf1..d887ac760 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -72,7 +72,7 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir) tx_engine_.ForEachActiveTransaction([](tx::Transaction &t) { if (t.creation_time() + std::chrono::seconds(FLAGS_query_execution_time_sec) < - std::chrono::system_clock::now()) { + std::chrono::steady_clock::now()) { t.set_should_abort(); }; }); diff --git a/src/query/engine.hpp b/src/query/engine.hpp index 4c2a9829f..3adab8adb 100644 --- a/src/query/engine.hpp +++ b/src/query/engine.hpp @@ -14,6 +14,7 @@ namespace fs = std::experimental::filesystem; #include "query/plan_interface.hpp" #include "utils/datetime/timestamp.hpp" #include "utils/dynamic_lib.hpp" +#include "utils/timer.hpp" DECLARE_bool(interpret); DECLARE_string(compile_directory); @@ -71,13 +72,18 @@ class QueryEngine { return true; } - clock_t start_time = clock(); + utils::Timer parsing_timer; query::StrippedQuery stripped(query); - clock_t end_parsing_time = clock(); + auto parsing_time = parsing_timer.Elapsed(); + + utils::Timer planning_timer; auto plan = LoadCypher(stripped); - clock_t end_planning_time = clock(); + auto planning_time = planning_timer.Elapsed(); + + utils::Timer execution_timer; auto result = plan->run(db_accessor, stripped.literals(), stream); - clock_t end_execution_time = clock(); + auto execution_time = execution_timer.Elapsed(); + if (UNLIKELY(!result)) { // info because it might be something like deadlock in which // case one thread is stopped and user has try again @@ -91,13 +97,11 @@ class QueryEngine { }; std::map<std::string, query::TypedValue> summary; - summary["query_parsing_time"] = time_second(start_time, end_parsing_time); + summary["query_parsing_time"] = parsing_time.count(); // This doesn't do any actual planning, but benchmarking harness knows how // to work with this field. - summary["query_planning_time"] = - time_second(end_parsing_time, end_planning_time); - summary["query_plan_execution_time"] = - time_second(end_planning_time, end_execution_time); + summary["query_planning_time"] = planning_time.count(); + summary["query_plan_execution_time"] = execution_time.count(); summary["type"] = "rw"; stream.Summary(summary); diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index ab0f237b3..f5ce97146 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -15,6 +15,7 @@ #include "query/interpret/frame.hpp" #include "query/plan/cost_estimator.hpp" #include "query/plan/planner.hpp" +#include "utils/timer.hpp" // TODO: Remove ast_cache flag and add flag that limits cache size. DECLARE_bool(ast_cache); @@ -28,8 +29,7 @@ class Interpreter { template <typename Stream> void Interpret(const std::string &query, GraphDbAccessor &db_accessor, Stream &stream) { - clock_t start_time = clock(); - + utils::Timer frontend_timer; Config config; Context ctx(config, db_accessor); std::map<std::string, TypedValue> summary; @@ -69,9 +69,9 @@ class Interpreter { } return it->second.Plug(stripped.literals(), stripped.named_expressions()); }(); + auto frontend_time = frontend_timer.Elapsed(); - clock_t frontend_end_time = clock(); - + utils::Timer planning_timer; // symbol table fill SymbolTable symbol_table; SymbolGenerator symbol_generator(symbol_table); @@ -106,9 +106,9 @@ class Interpreter { // generate frame based on symbol table max_position Frame frame(symbol_table.max_position()); + auto planning_time = planning_timer.Elapsed(); - clock_t planning_end_time = clock(); - + utils::Timer execution_timer; std::vector<std::string> header; std::vector<Symbol> output_symbols( logical_plan->OutputSymbols(symbol_table)); @@ -144,19 +144,16 @@ class Interpreter { } else { throw QueryRuntimeException("Unknown top level LogicalOperator"); } - - clock_t execution_end_time = clock(); + auto execution_time = execution_timer.Elapsed(); // helper function for calculating time in seconds auto time_second = [](clock_t start, clock_t end) { return TypedValue(double(end - start) / CLOCKS_PER_SEC); }; - summary["query_parsing_time"] = time_second(start_time, frontend_end_time); - summary["query_planning_time"] = - time_second(frontend_end_time, planning_end_time); - summary["query_plan_execution_time"] = - time_second(planning_end_time, execution_end_time); + summary["query_parsing_time"] = frontend_time.count(); + summary["query_planning_time"] = planning_time.count(); + summary["query_plan_execution_time"] = execution_time.count(); summary["query_cost_estimate"] = query_plan_cost_estimation; // TODO: set summary['type'] based on transaction metadata diff --git a/src/utils/cpu_relax.hpp b/src/threading/sync/cpu_relax.hpp similarity index 100% rename from src/utils/cpu_relax.hpp rename to src/threading/sync/cpu_relax.hpp diff --git a/src/threading/sync/futex.hpp b/src/threading/sync/futex.hpp index d569a0cf7..690abfa1e 100644 --- a/src/threading/sync/futex.hpp +++ b/src/threading/sync/futex.hpp @@ -10,7 +10,7 @@ #include <unistd.h> #include "threading/sync/lock_timeout_exception.hpp" -#include "utils/cpu_relax.hpp" +#include "threading/sync/cpu_relax.hpp" namespace sys { inline int futex(void *addr1, int op, int val1, const struct timespec *timeout, diff --git a/src/threading/sync/spinlock.hpp b/src/threading/sync/spinlock.hpp index a515d68ef..048632072 100644 --- a/src/threading/sync/spinlock.hpp +++ b/src/threading/sync/spinlock.hpp @@ -3,7 +3,7 @@ #include <unistd.h> #include <atomic> -#include "utils/cpu_relax.hpp" +#include "threading/sync/cpu_relax.hpp" /** * @class SpinLock diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index ec014081d..1d54fd1a0 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -83,7 +83,7 @@ class Transaction { // should stop execution, it is only a hint, transaction can disobey. std::atomic<bool> should_abort_{false}; // Creation time. - const std::chrono::time_point<std::chrono::system_clock> creation_time_{ - std::chrono::system_clock::now()}; + const std::chrono::time_point<std::chrono::steady_clock> creation_time_{ + std::chrono::steady_clock::now()}; }; } diff --git a/src/utils/timer.hpp b/src/utils/timer.hpp index 07cd2b6aa..0eab6c049 100644 --- a/src/utils/timer.hpp +++ b/src/utils/timer.hpp @@ -1,157 +1,18 @@ #pragma once -#include <atomic> #include <chrono> -#include <memory> -#include <set> -#include <thread> - -#include <glog/logging.h> namespace utils { -/** - * @class Timer - * - * @brief The timer contains counter and handler which is executed when the time - * exceedes. - * - * With every clock interval the counter should be decreased for - * delta count. Delta count is one for now but it should be a variable in the - * near future. The handler is function that will be called when counter - * becomes zero or smaller than zero. - */ -struct Timer { - using sptr = std::shared_ptr<Timer>; - using handler_t = std::function<void(void)>; - - Timer(int64_t counter, handler_t handler) - : counter(counter), handler(handler) {} - - bool operator--() { - if (--counter <= 0) - return true; - else - return false; - } - - int64_t counter; - handler_t handler; -}; - -/** - * Timer container knows how to add a new timer and remove the - * existing container from itself. Also, time container object - * has the process method whose responsibility is to iterate - * over existing timers and call the appropriate handler function. - * The handler method could be called on the same thread, on a - * separate thread or on a thread pool, that is implementation detail of - * the process method. - */ - -/** - * @class TimerSet - * - * @brief Trivial timer container implementation. - * - * Internal data stucture for storage of timers is std::set. So, the - * related timer complexities are: - * insertion: O(log(n)) - * deletion: O(log(n)) - * process: O(n) - */ -class TimerSet { +class Timer { public: - void add(Timer::sptr timer) { timers.insert(timer); } - - void remove(Timer::sptr timer) { timers.erase(timer); } - - uint64_t size() const { return timers.size(); } - - void process() { - for (auto it = timers.begin(); it != timers.end();) { - auto timer = *it; - if (--*timer) { - timer->handler(); - it = timers.erase(it); - continue; - } - ++it; - } + /** Time elapsed since creation. */ + std::chrono::duration<double> Elapsed() { + return std::chrono::steady_clock::now() - start_time_; } private: - std::set<std::shared_ptr<Timer>> timers; + std::chrono::time_point<std::chrono::steady_clock> start_time_ = + std::chrono::steady_clock::now(); }; - -/** - * @class TimerScheduler - * - * @brief TimerScheduler is a manager class and its responsibility is to - * take care of the time and call the timer_container process method in the - * appropriate time. - * - * @tparam timer_container_type implements a strategy how the timers - * are processed - * @tparam delta_time_type type of a time distance between two events - * @tparam delta_time granularity between the two events, default value is 1 - */ -template <typename timer_container_type, typename delta_time_type, - uint64_t delta_time = 1> -class TimerScheduler { - public: - /** - * Adds a timer. - * - * @param timer shared pointer to the timer object \ref Timer - */ - void add(Timer::sptr timer) { timer_container.add(timer); } - - /** - * Removes a timer. - * - * @param timer shared pointer to the timer object \ref Timer - */ - void remove(Timer::sptr timer) { timer_container.remove(timer); } - - /** - * Provides the number of pending timers. The exact number has to be - * provided by a timer_container. - * - * @return uint64_t the number of pending timers. - */ - uint64_t size() const { return timer_container.size(); } - - /** - * Runs a separate thread which responsibility is to run the process method - * at the appropriate time (every delta_time from the beginning of - * processing. - */ - void run() { - is_running.store(true); - - run_thread = std::thread([this]() { - while (is_running.load()) { - std::this_thread::sleep_for(delta_time_type(delta_time)); - timer_container.process(); - DLOG(INFO) << "timer_container.process()"; - } - }); - } - - /** - * Stops the whole processing. - */ - void stop() { is_running.store(false); } - - /** - * Joins the processing thread. - */ - ~TimerScheduler() { run_thread.join(); } - - private: - timer_container_type timer_container; - std::thread run_thread; - std::atomic<bool> is_running; }; -}