New version of scheduler.

Summary: New scheduler declaration.

Reviewers: matej.gradicek, buda

Reviewed By: matej.gradicek

Differential Revision: https://phabricator.memgraph.io/D246
This commit is contained in:
Dominik Gleich 2017-04-07 11:33:24 +02:00
parent 74740dd162
commit 2032466e2a
2 changed files with 36 additions and 54 deletions

View File

@ -4,37 +4,37 @@
#include <ctime> #include <ctime>
#include <thread> #include <thread>
#include "utils/assert.hpp"
/** /**
* Class used to run given std::function<void()>. Function is ran and then sleep * Class used to run scheduled function execution. Class is templated with
* is called with parameter given in class constructor. Class is templated with
* mutex class TMutex which is used to synchronize threads. Default template * mutex class TMutex which is used to synchronize threads. Default template
* value is std::mutex. * value is std::mutex.
*/ */
template <typename TMutex = std::mutex> template <typename TMutex = std::mutex>
class Scheduler { class Scheduler {
public: public:
Scheduler() {}
/** /**
* @param t - Time between executing function. If function is still running * @param pause - Time between executing function. If function is still
* when it should be ran again, it will not be ran and next * running when it should be ran again, it will not be ran and next start time
* start time will be increased current time plus t. If t equals * will be increased to current time plus pause.
* -1, scheduler will not run.
* @param f - Function which will be executed. * @param f - Function which will be executed.
*/ */
Scheduler(const std::chrono::seconds &t, const std::function<void()> &f) void Run(const std::chrono::seconds &pause, const std::function<void()> &f) {
: pause_(t), func_(f) { debug_assert(is_working_ == false, "Thread already running.");
thread_ = std::thread([this]() { is_working_ = true;
// if pause_ equals -1, scheduler will not run thread_ = std::thread([this, pause, f]() {
if (pause_ == std::chrono::seconds(-1)) return;
auto start_time = std::chrono::system_clock::now(); auto start_time = std::chrono::system_clock::now();
for (;;) { for (;;) {
if (!is_working_.load()) break; if (!is_working_.load()) break;
func_(); f();
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
while (now >= start_time) start_time += pause_; while (now >= start_time) start_time += pause;
condition_variable_.wait_for( condition_variable_.wait_for(
lk, start_time - now, [&] { return is_working_.load() == false; }); lk, start_time - now, [&] { return is_working_.load() == false; });
@ -43,7 +43,11 @@ class Scheduler {
}); });
} }
~Scheduler() { /**
* @brief Stops the thread execution. This is a blocking call and may take as
* much time as one call to the function given previously to Run takes.
*/
void Stop() {
is_working_.store(false); is_working_.store(false);
{ {
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
@ -52,19 +56,14 @@ class Scheduler {
if (thread_.joinable()) thread_.join(); if (thread_.joinable()) thread_.join();
} }
~Scheduler() { Stop(); }
private: private:
/** /**
* Variable is true when thread is running. * Variable is true when thread is running.
*/ */
std::atomic<bool> is_working_{true}; std::atomic<bool> is_working_{false};
/**
* Time interval between function execution.
*/
const std::chrono::seconds pause_;
/**
* Function which scheduler executes.
*/
const std::function<void()> func_;
/** /**
* Mutex used to synchronize threads using condition variable. * Mutex used to synchronize threads using condition variable.
*/ */
@ -75,6 +74,7 @@ class Scheduler {
* time interval if destructor is called. * time interval if destructor is called.
*/ */
std::condition_variable condition_variable_; std::condition_variable condition_variable_;
/** /**
* Thread which runs function. * Thread which runs function.
*/ */

View File

@ -1,8 +1,8 @@
#include "gmock/gmock.h" #include "gmock/gmock.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "utils/scheduler.hpp"
#include <atomic> #include <atomic>
#include "utils/scheduler.hpp"
/** /**
* Scheduler runs every 2 seconds and increases one variable. Test thread * Scheduler runs every 2 seconds and increases one variable. Test thread
@ -15,33 +15,15 @@ TEST(Scheduler, TestFunctionExecuting) {
EXPECT_EQ(y.load(), x.load()); EXPECT_EQ(y.load(), x.load());
x++; x++;
}}; }};
Scheduler<> scheduler(std::chrono::seconds(1), func); Scheduler<> scheduler;
scheduler.Run(std::chrono::seconds(1), func);
std::this_thread::sleep_for(std::chrono::milliseconds(980)); std::this_thread::sleep_for(std::chrono::milliseconds(980));
y++; y++;
EXPECT_EQ(x.load(), y.load()); EXPECT_EQ(x.load(), y.load());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::seconds(1)); scheduler.Stop();
y++; y++;
EXPECT_EQ(x.load(), y.load()); EXPECT_EQ(x.load(), y.load());
std::this_thread::sleep_for(std::chrono::seconds(1));
y++;
EXPECT_EQ(x.load(), y.load());
EXPECT_EQ(x.load(), 3);
}
/**
* Scheduler will not run because time is set to -1.
*/
TEST(Scheduler, DoNotRunScheduler) {
std::atomic<int> x{0};
std::function<void()> func{[&x]() { x++; }};
Scheduler<> scheduler(std::chrono::seconds(-1), func);
std::this_thread::sleep_for(std::chrono::seconds(3));
EXPECT_EQ(x.load(), 0);
} }