memgraph/src/utils/scheduler.hpp
Dominik Gleich fcecb14545 Replace debug_assert, permanent_assert with DCHECK/CHECK
Summary:
Phase 2.

Phase 3.

Phase 4.

Phase 5.

Complete refactor.

Reviewers: florijan, mislav.bradac

Reviewed By: mislav.bradac

Subscribers: mislav.bradac, pullbot

Differential Revision: https://phabricator.memgraph.io/D895
2017-10-11 14:43:32 +02:00

97 lines
2.7 KiB
C++

#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <ctime>
#include <functional>
#include <thread>
#include "glog/logging.h"
/**
* Class used to run scheduled function execution.
*/
class Scheduler {
public:
Scheduler() {}
/**
* @param pause - Duration between two function executions. If function is
* still running when it should be ran again, it will run right after it
* finishes its previous run.
* @param f - Function
* @Tparam TRep underlying arithmetic type in duration
* @Tparam TPeriod duration in seconds between two ticks
*/
template <typename TRep, typename TPeriod>
void Run(const std::chrono::duration<TRep, TPeriod> &pause,
const std::function<void()> &f) {
DCHECK(is_working_ == false) << "Thread already running.";
DCHECK(pause > std::chrono::seconds(0)) << "Pause is invalid.";
is_working_ = true;
thread_ = std::thread([this, pause, f]() {
auto start_time = std::chrono::system_clock::now();
while (true) {
// First wait then execute the function. We do that in that order
// because most of the schedulers are started at the beginning of the
// program and there is probably no work to do in scheduled function at
// the start of the program. Since Server will log some messages on
// the program start we let him log first and we make sure by first
// waiting that funcion f will not log before it.
std::unique_lock<std::mutex> lk(mutex_);
auto now = std::chrono::system_clock::now();
start_time += pause;
if (start_time > now) {
condition_variable_.wait_for(lk, start_time - now, [&] {
return is_working_.load() == false;
});
} else {
start_time = now;
}
if (!is_working_) break;
f();
}
});
}
/**
* @brief Stops the thread execution. This is a blocking call and may take as
* much time as one call to the function given previously to Run takes.
*/
void Stop() {
is_working_.store(false);
{
std::unique_lock<std::mutex> lk(mutex_);
condition_variable_.notify_one();
}
if (thread_.joinable()) thread_.join();
}
~Scheduler() { Stop(); }
private:
/**
* Variable is true when thread is running.
*/
std::atomic<bool> is_working_{false};
/**
* Mutex used to synchronize threads using condition variable.
*/
std::mutex mutex_;
/**
* Condition variable is used to stop waiting until the end of the
* time interval if destructor is called.
*/
std::condition_variable condition_variable_;
/**
* Thread which runs function.
*/
std::thread thread_;
};