Migrate timer to use walltime, instead of cputime
Summary: - Move some stuff to poc - Use steady_clock instead of system_clock Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D555
This commit is contained in:
parent
7b7aad196a
commit
50cd53e5c9
@ -4,8 +4,8 @@
|
||||
#include <glog/logging.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "timer.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using namespace utils;
|
157
poc/timer.hpp
Normal file
157
poc/timer.hpp
Normal file
@ -0,0 +1,157 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
namespace utils {
|
||||
|
||||
/**
|
||||
* @class Timer
|
||||
*
|
||||
* @brief The timer contains counter and handler which is executed when the time
|
||||
* exceedes.
|
||||
*
|
||||
* With every clock interval the counter should be decreased for
|
||||
* delta count. Delta count is one for now but it should be a variable in the
|
||||
* near future. The handler is function that will be called when counter
|
||||
* becomes zero or smaller than zero.
|
||||
*/
|
||||
struct Timer {
|
||||
using sptr = std::shared_ptr<Timer>;
|
||||
using handler_t = std::function<void(void)>;
|
||||
|
||||
Timer(int64_t counter, handler_t handler)
|
||||
: counter(counter), handler(handler) {}
|
||||
|
||||
bool operator--() {
|
||||
if (--counter <= 0)
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t counter;
|
||||
handler_t handler;
|
||||
};
|
||||
|
||||
/**
|
||||
* Timer container knows how to add a new timer and remove the
|
||||
* existing container from itself. Also, time container object
|
||||
* has the process method whose responsibility is to iterate
|
||||
* over existing timers and call the appropriate handler function.
|
||||
* The handler method could be called on the same thread, on a
|
||||
* separate thread or on a thread pool, that is implementation detail of
|
||||
* the process method.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @class TimerSet
|
||||
*
|
||||
* @brief Trivial timer container implementation.
|
||||
*
|
||||
* Internal data stucture for storage of timers is std::set. So, the
|
||||
* related timer complexities are:
|
||||
* insertion: O(log(n))
|
||||
* deletion: O(log(n))
|
||||
* process: O(n)
|
||||
*/
|
||||
class TimerSet {
|
||||
public:
|
||||
void add(Timer::sptr timer) { timers.insert(timer); }
|
||||
|
||||
void remove(Timer::sptr timer) { timers.erase(timer); }
|
||||
|
||||
uint64_t size() const { return timers.size(); }
|
||||
|
||||
void process() {
|
||||
for (auto it = timers.begin(); it != timers.end();) {
|
||||
auto timer = *it;
|
||||
if (--*timer) {
|
||||
timer->handler();
|
||||
it = timers.erase(it);
|
||||
continue;
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::set<std::shared_ptr<Timer>> timers;
|
||||
};
|
||||
|
||||
/**
|
||||
* @class TimerScheduler
|
||||
*
|
||||
* @brief TimerScheduler is a manager class and its responsibility is to
|
||||
* take care of the time and call the timer_container process method in the
|
||||
* appropriate time.
|
||||
*
|
||||
* @tparam timer_container_type implements a strategy how the timers
|
||||
* are processed
|
||||
* @tparam delta_time_type type of a time distance between two events
|
||||
* @tparam delta_time granularity between the two events, default value is 1
|
||||
*/
|
||||
template <typename timer_container_type, typename delta_time_type,
|
||||
uint64_t delta_time = 1>
|
||||
class TimerScheduler {
|
||||
public:
|
||||
/**
|
||||
* Adds a timer.
|
||||
*
|
||||
* @param timer shared pointer to the timer object \ref Timer
|
||||
*/
|
||||
void add(Timer::sptr timer) { timer_container.add(timer); }
|
||||
|
||||
/**
|
||||
* Removes a timer.
|
||||
*
|
||||
* @param timer shared pointer to the timer object \ref Timer
|
||||
*/
|
||||
void remove(Timer::sptr timer) { timer_container.remove(timer); }
|
||||
|
||||
/**
|
||||
* Provides the number of pending timers. The exact number has to be
|
||||
* provided by a timer_container.
|
||||
*
|
||||
* @return uint64_t the number of pending timers.
|
||||
*/
|
||||
uint64_t size() const { return timer_container.size(); }
|
||||
|
||||
/**
|
||||
* Runs a separate thread which responsibility is to run the process method
|
||||
* at the appropriate time (every delta_time from the beginning of
|
||||
* processing.
|
||||
*/
|
||||
void run() {
|
||||
is_running.store(true);
|
||||
|
||||
run_thread = std::thread([this]() {
|
||||
while (is_running.load()) {
|
||||
std::this_thread::sleep_for(delta_time_type(delta_time));
|
||||
timer_container.process();
|
||||
DLOG(INFO) << "timer_container.process()";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the whole processing.
|
||||
*/
|
||||
void stop() { is_running.store(false); }
|
||||
|
||||
/**
|
||||
* Joins the processing thread.
|
||||
*/
|
||||
~TimerScheduler() { run_thread.join(); }
|
||||
|
||||
private:
|
||||
timer_container_type timer_container;
|
||||
std::thread run_thread;
|
||||
std::atomic<bool> is_running;
|
||||
};
|
||||
}
|
@ -72,7 +72,7 @@ GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir)
|
||||
tx_engine_.ForEachActiveTransaction([](tx::Transaction &t) {
|
||||
if (t.creation_time() +
|
||||
std::chrono::seconds(FLAGS_query_execution_time_sec) <
|
||||
std::chrono::system_clock::now()) {
|
||||
std::chrono::steady_clock::now()) {
|
||||
t.set_should_abort();
|
||||
};
|
||||
});
|
||||
|
@ -14,6 +14,7 @@ namespace fs = std::experimental::filesystem;
|
||||
#include "query/plan_interface.hpp"
|
||||
#include "utils/datetime/timestamp.hpp"
|
||||
#include "utils/dynamic_lib.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DECLARE_bool(interpret);
|
||||
DECLARE_string(compile_directory);
|
||||
@ -71,13 +72,18 @@ class QueryEngine {
|
||||
return true;
|
||||
}
|
||||
|
||||
clock_t start_time = clock();
|
||||
utils::Timer parsing_timer;
|
||||
query::StrippedQuery stripped(query);
|
||||
clock_t end_parsing_time = clock();
|
||||
auto parsing_time = parsing_timer.Elapsed();
|
||||
|
||||
utils::Timer planning_timer;
|
||||
auto plan = LoadCypher(stripped);
|
||||
clock_t end_planning_time = clock();
|
||||
auto planning_time = planning_timer.Elapsed();
|
||||
|
||||
utils::Timer execution_timer;
|
||||
auto result = plan->run(db_accessor, stripped.literals(), stream);
|
||||
clock_t end_execution_time = clock();
|
||||
auto execution_time = execution_timer.Elapsed();
|
||||
|
||||
if (UNLIKELY(!result)) {
|
||||
// info because it might be something like deadlock in which
|
||||
// case one thread is stopped and user has try again
|
||||
@ -91,13 +97,11 @@ class QueryEngine {
|
||||
};
|
||||
|
||||
std::map<std::string, query::TypedValue> summary;
|
||||
summary["query_parsing_time"] = time_second(start_time, end_parsing_time);
|
||||
summary["query_parsing_time"] = parsing_time.count();
|
||||
// This doesn't do any actual planning, but benchmarking harness knows how
|
||||
// to work with this field.
|
||||
summary["query_planning_time"] =
|
||||
time_second(end_parsing_time, end_planning_time);
|
||||
summary["query_plan_execution_time"] =
|
||||
time_second(end_planning_time, end_execution_time);
|
||||
summary["query_planning_time"] = planning_time.count();
|
||||
summary["query_plan_execution_time"] = execution_time.count();
|
||||
summary["type"] = "rw";
|
||||
stream.Summary(summary);
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "query/interpret/frame.hpp"
|
||||
#include "query/plan/cost_estimator.hpp"
|
||||
#include "query/plan/planner.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
// TODO: Remove ast_cache flag and add flag that limits cache size.
|
||||
DECLARE_bool(ast_cache);
|
||||
@ -28,8 +29,7 @@ class Interpreter {
|
||||
template <typename Stream>
|
||||
void Interpret(const std::string &query, GraphDbAccessor &db_accessor,
|
||||
Stream &stream) {
|
||||
clock_t start_time = clock();
|
||||
|
||||
utils::Timer frontend_timer;
|
||||
Config config;
|
||||
Context ctx(config, db_accessor);
|
||||
std::map<std::string, TypedValue> summary;
|
||||
@ -69,9 +69,9 @@ class Interpreter {
|
||||
}
|
||||
return it->second.Plug(stripped.literals(), stripped.named_expressions());
|
||||
}();
|
||||
auto frontend_time = frontend_timer.Elapsed();
|
||||
|
||||
clock_t frontend_end_time = clock();
|
||||
|
||||
utils::Timer planning_timer;
|
||||
// symbol table fill
|
||||
SymbolTable symbol_table;
|
||||
SymbolGenerator symbol_generator(symbol_table);
|
||||
@ -106,9 +106,9 @@ class Interpreter {
|
||||
|
||||
// generate frame based on symbol table max_position
|
||||
Frame frame(symbol_table.max_position());
|
||||
auto planning_time = planning_timer.Elapsed();
|
||||
|
||||
clock_t planning_end_time = clock();
|
||||
|
||||
utils::Timer execution_timer;
|
||||
std::vector<std::string> header;
|
||||
std::vector<Symbol> output_symbols(
|
||||
logical_plan->OutputSymbols(symbol_table));
|
||||
@ -144,19 +144,16 @@ class Interpreter {
|
||||
} else {
|
||||
throw QueryRuntimeException("Unknown top level LogicalOperator");
|
||||
}
|
||||
|
||||
clock_t execution_end_time = clock();
|
||||
auto execution_time = execution_timer.Elapsed();
|
||||
|
||||
// helper function for calculating time in seconds
|
||||
auto time_second = [](clock_t start, clock_t end) {
|
||||
return TypedValue(double(end - start) / CLOCKS_PER_SEC);
|
||||
};
|
||||
|
||||
summary["query_parsing_time"] = time_second(start_time, frontend_end_time);
|
||||
summary["query_planning_time"] =
|
||||
time_second(frontend_end_time, planning_end_time);
|
||||
summary["query_plan_execution_time"] =
|
||||
time_second(planning_end_time, execution_end_time);
|
||||
summary["query_parsing_time"] = frontend_time.count();
|
||||
summary["query_planning_time"] = planning_time.count();
|
||||
summary["query_plan_execution_time"] = execution_time.count();
|
||||
summary["query_cost_estimate"] = query_plan_cost_estimation;
|
||||
|
||||
// TODO: set summary['type'] based on transaction metadata
|
||||
|
@ -10,7 +10,7 @@
|
||||
#include <unistd.h>
|
||||
|
||||
#include "threading/sync/lock_timeout_exception.hpp"
|
||||
#include "utils/cpu_relax.hpp"
|
||||
#include "threading/sync/cpu_relax.hpp"
|
||||
|
||||
namespace sys {
|
||||
inline int futex(void *addr1, int op, int val1, const struct timespec *timeout,
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <unistd.h>
|
||||
#include <atomic>
|
||||
|
||||
#include "utils/cpu_relax.hpp"
|
||||
#include "threading/sync/cpu_relax.hpp"
|
||||
|
||||
/**
|
||||
* @class SpinLock
|
||||
|
@ -83,7 +83,7 @@ class Transaction {
|
||||
// should stop execution, it is only a hint, transaction can disobey.
|
||||
std::atomic<bool> should_abort_{false};
|
||||
// Creation time.
|
||||
const std::chrono::time_point<std::chrono::system_clock> creation_time_{
|
||||
std::chrono::system_clock::now()};
|
||||
const std::chrono::time_point<std::chrono::steady_clock> creation_time_{
|
||||
std::chrono::steady_clock::now()};
|
||||
};
|
||||
}
|
||||
|
@ -1,157 +1,18 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
namespace utils {
|
||||
|
||||
/**
|
||||
* @class Timer
|
||||
*
|
||||
* @brief The timer contains counter and handler which is executed when the time
|
||||
* exceedes.
|
||||
*
|
||||
* With every clock interval the counter should be decreased for
|
||||
* delta count. Delta count is one for now but it should be a variable in the
|
||||
* near future. The handler is function that will be called when counter
|
||||
* becomes zero or smaller than zero.
|
||||
*/
|
||||
struct Timer {
|
||||
using sptr = std::shared_ptr<Timer>;
|
||||
using handler_t = std::function<void(void)>;
|
||||
|
||||
Timer(int64_t counter, handler_t handler)
|
||||
: counter(counter), handler(handler) {}
|
||||
|
||||
bool operator--() {
|
||||
if (--counter <= 0)
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t counter;
|
||||
handler_t handler;
|
||||
};
|
||||
|
||||
/**
|
||||
* Timer container knows how to add a new timer and remove the
|
||||
* existing container from itself. Also, time container object
|
||||
* has the process method whose responsibility is to iterate
|
||||
* over existing timers and call the appropriate handler function.
|
||||
* The handler method could be called on the same thread, on a
|
||||
* separate thread or on a thread pool, that is implementation detail of
|
||||
* the process method.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @class TimerSet
|
||||
*
|
||||
* @brief Trivial timer container implementation.
|
||||
*
|
||||
* Internal data stucture for storage of timers is std::set. So, the
|
||||
* related timer complexities are:
|
||||
* insertion: O(log(n))
|
||||
* deletion: O(log(n))
|
||||
* process: O(n)
|
||||
*/
|
||||
class TimerSet {
|
||||
class Timer {
|
||||
public:
|
||||
void add(Timer::sptr timer) { timers.insert(timer); }
|
||||
|
||||
void remove(Timer::sptr timer) { timers.erase(timer); }
|
||||
|
||||
uint64_t size() const { return timers.size(); }
|
||||
|
||||
void process() {
|
||||
for (auto it = timers.begin(); it != timers.end();) {
|
||||
auto timer = *it;
|
||||
if (--*timer) {
|
||||
timer->handler();
|
||||
it = timers.erase(it);
|
||||
continue;
|
||||
}
|
||||
++it;
|
||||
}
|
||||
/** Time elapsed since creation. */
|
||||
std::chrono::duration<double> Elapsed() {
|
||||
return std::chrono::steady_clock::now() - start_time_;
|
||||
}
|
||||
|
||||
private:
|
||||
std::set<std::shared_ptr<Timer>> timers;
|
||||
std::chrono::time_point<std::chrono::steady_clock> start_time_ =
|
||||
std::chrono::steady_clock::now();
|
||||
};
|
||||
|
||||
/**
|
||||
* @class TimerScheduler
|
||||
*
|
||||
* @brief TimerScheduler is a manager class and its responsibility is to
|
||||
* take care of the time and call the timer_container process method in the
|
||||
* appropriate time.
|
||||
*
|
||||
* @tparam timer_container_type implements a strategy how the timers
|
||||
* are processed
|
||||
* @tparam delta_time_type type of a time distance between two events
|
||||
* @tparam delta_time granularity between the two events, default value is 1
|
||||
*/
|
||||
template <typename timer_container_type, typename delta_time_type,
|
||||
uint64_t delta_time = 1>
|
||||
class TimerScheduler {
|
||||
public:
|
||||
/**
|
||||
* Adds a timer.
|
||||
*
|
||||
* @param timer shared pointer to the timer object \ref Timer
|
||||
*/
|
||||
void add(Timer::sptr timer) { timer_container.add(timer); }
|
||||
|
||||
/**
|
||||
* Removes a timer.
|
||||
*
|
||||
* @param timer shared pointer to the timer object \ref Timer
|
||||
*/
|
||||
void remove(Timer::sptr timer) { timer_container.remove(timer); }
|
||||
|
||||
/**
|
||||
* Provides the number of pending timers. The exact number has to be
|
||||
* provided by a timer_container.
|
||||
*
|
||||
* @return uint64_t the number of pending timers.
|
||||
*/
|
||||
uint64_t size() const { return timer_container.size(); }
|
||||
|
||||
/**
|
||||
* Runs a separate thread which responsibility is to run the process method
|
||||
* at the appropriate time (every delta_time from the beginning of
|
||||
* processing.
|
||||
*/
|
||||
void run() {
|
||||
is_running.store(true);
|
||||
|
||||
run_thread = std::thread([this]() {
|
||||
while (is_running.load()) {
|
||||
std::this_thread::sleep_for(delta_time_type(delta_time));
|
||||
timer_container.process();
|
||||
DLOG(INFO) << "timer_container.process()";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the whole processing.
|
||||
*/
|
||||
void stop() { is_running.store(false); }
|
||||
|
||||
/**
|
||||
* Joins the processing thread.
|
||||
*/
|
||||
~TimerScheduler() { run_thread.join(); }
|
||||
|
||||
private:
|
||||
timer_container_type timer_container;
|
||||
std::thread run_thread;
|
||||
std::atomic<bool> is_running;
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user