Add Watchdog utility

Summary: see documentation

Reviewers: mislav.bradac, buda, teon.banek, dgleich

Reviewed By: dgleich

Subscribers: teon.banek, dgleich, pullbot

Differential Revision: https://phabricator.memgraph.io/D951
This commit is contained in:
Marin Tomic 2017-11-02 09:59:15 +01:00
parent 0c0ea4e606
commit dddfe52a45
4 changed files with 182 additions and 0 deletions

View File

@ -217,6 +217,7 @@ set(memgraph_src_files
${src_dir}/storage/vertex_accessor.cpp
${src_dir}/threading/thread.cpp
${src_dir}/transactions/transaction.cpp
${src_dir}/utils/watchdog.cpp
)
# -----------------------------------------------------------------------------

67
src/utils/watchdog.cpp Normal file
View File

@ -0,0 +1,67 @@
#include "watchdog.hpp"
#include <atomic>
#include <chrono>
#include <functional>
#include <random>
#include <thread>
#include "glog/logging.h"
using std::chrono::milliseconds;
using std::chrono::steady_clock;
Watchdog::Watchdog(const milliseconds &min_timeout,
const milliseconds &max_timeout,
const std::function<void()> &callback)
: min_timeout_(min_timeout),
max_timeout_(max_timeout),
generator_(std::random_device{}()),
distribution_(min_timeout.count(), max_timeout_.count()),
callback_(callback),
draining_(false),
blocked_(false) {
DCHECK(min_timeout_ <= max_timeout_)
<< "Min timeout should be less than max timeout";
Notify();
thread_ = std::thread([this]() { Run(); });
}
Watchdog::~Watchdog() {
draining_ = true;
if (thread_.joinable()) {
thread_.join();
}
}
void Watchdog::Notify() {
std::lock_guard<std::mutex> guard(mutex_);
callback_threshold_ =
steady_clock::now() + milliseconds(distribution_(generator_));
}
void Watchdog::Block() { blocked_ = true; }
void Watchdog::Unblock() {
if (blocked_) {
Notify();
}
blocked_ = false;
}
void Watchdog::Run() {
while (!draining_) {
steady_clock::time_point t;
{
std::lock_guard<std::mutex> guard(mutex_);
t = callback_threshold_;
}
if (steady_clock::now() > t) {
if (!blocked_) {
callback_();
}
Notify();
}
std::this_thread::sleep_until(t);
}
}

55
src/utils/watchdog.hpp Normal file
View File

@ -0,0 +1,55 @@
#pragma once
#include <atomic>
#include <cassert>
#include <chrono>
#include <functional>
#include <mutex>
#include <random>
#include <thread>
/**
* @brief - Keeps track of how long it's been since `Notify` method was
* called. If it wasn't called for a sufficiently long time interval (randomly
* chosen between `min_timeout` and `max_timeout`), the watchdog will
* periodically call `callback` until it is notified or destroyed.
*/
class Watchdog {
public:
Watchdog(const std::chrono::milliseconds &min_timeout,
const std::chrono::milliseconds &max_timeout,
const std::function<void()> &callback);
~Watchdog();
Watchdog(Watchdog &&) = delete;
Watchdog(const Watchdog &) = delete;
Watchdog &operator=(Watchdog &&) = delete;
Watchdog &operator=(const Watchdog &) = delete;
void Notify();
/** Calling `Block` is equivalent to continuously calling `Notify`
* until `Unblock` is called.
*/
void Block();
void Unblock();
private:
void Run();
std::chrono::milliseconds min_timeout_;
std::chrono::milliseconds max_timeout_;
std::mutex mutex_;
// Used to generate callback timeouts.
std::mt19937 generator_;
std::uniform_int_distribution<int> distribution_;
std::chrono::steady_clock::time_point callback_threshold_;
std::function<void()> callback_;
// Used to notify the watchdog loop it should stop.
std::atomic<bool> draining_;
std::atomic<bool> blocked_;
std::thread thread_;
};

59
tests/unit/watchdog.cpp Normal file
View File

@ -0,0 +1,59 @@
#include <atomic>
#include "gtest/gtest.h"
#include "utils/watchdog.hpp"
using namespace std::chrono_literals;
TEST(Watchdog, Run) {
std::atomic<int> count(0);
Watchdog dog(200ms, 200ms, [&count]() { ++count; });
std::this_thread::sleep_for(250ms);
EXPECT_EQ(count, 1);
std::this_thread::sleep_for(200ms);
EXPECT_EQ(count, 2);
std::this_thread::sleep_for(50ms);
dog.Notify();
std::this_thread::sleep_for(150ms);
EXPECT_EQ(count, 2);
dog.Notify();
std::this_thread::sleep_for(250ms);
EXPECT_EQ(count, 3);
}
TEST(Watchdog, Blocker) {
std::atomic<int> count(0);
Watchdog dog(200ms, 200ms, [&count]() { ++count; });
std::this_thread::sleep_for(250ms);
EXPECT_EQ(count, 1);
dog.Block();
std::this_thread::sleep_for(200ms);
EXPECT_EQ(count, 1);
std::this_thread::sleep_for(200ms);
EXPECT_EQ(count, 1);
dog.Unblock();
std::this_thread::sleep_for(150ms);
EXPECT_EQ(count, 1);
std::this_thread::sleep_for(100ms);
EXPECT_EQ(count, 2);
dog.Notify();
std::this_thread::sleep_for(100ms);
dog.Unblock();
std::this_thread::sleep_for(150ms);
EXPECT_EQ(count, 3);
}