diff --git a/CMakeLists.txt b/CMakeLists.txt index e225b4099..48d16c9e3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -217,6 +217,7 @@ set(memgraph_src_files ${src_dir}/storage/vertex_accessor.cpp ${src_dir}/threading/thread.cpp ${src_dir}/transactions/transaction.cpp + ${src_dir}/utils/watchdog.cpp ) # ----------------------------------------------------------------------------- diff --git a/src/utils/watchdog.cpp b/src/utils/watchdog.cpp new file mode 100644 index 000000000..aa7338d26 --- /dev/null +++ b/src/utils/watchdog.cpp @@ -0,0 +1,67 @@ +#include "watchdog.hpp" + +#include +#include +#include +#include +#include + +#include "glog/logging.h" + +using std::chrono::milliseconds; +using std::chrono::steady_clock; + +Watchdog::Watchdog(const milliseconds &min_timeout, + const milliseconds &max_timeout, + const std::function &callback) + : min_timeout_(min_timeout), + max_timeout_(max_timeout), + generator_(std::random_device{}()), + distribution_(min_timeout.count(), max_timeout_.count()), + callback_(callback), + draining_(false), + blocked_(false) { + DCHECK(min_timeout_ <= max_timeout_) + << "Min timeout should be less than max timeout"; + Notify(); + thread_ = std::thread([this]() { Run(); }); +} + +Watchdog::~Watchdog() { + draining_ = true; + if (thread_.joinable()) { + thread_.join(); + } +} + +void Watchdog::Notify() { + std::lock_guard guard(mutex_); + callback_threshold_ = + steady_clock::now() + milliseconds(distribution_(generator_)); +} + +void Watchdog::Block() { blocked_ = true; } + +void Watchdog::Unblock() { + if (blocked_) { + Notify(); + } + blocked_ = false; +} + +void Watchdog::Run() { + while (!draining_) { + steady_clock::time_point t; + { + std::lock_guard guard(mutex_); + t = callback_threshold_; + } + if (steady_clock::now() > t) { + if (!blocked_) { + callback_(); + } + Notify(); + } + std::this_thread::sleep_until(t); + } +} diff --git a/src/utils/watchdog.hpp b/src/utils/watchdog.hpp new file mode 100644 index 000000000..bdeab48b0 --- /dev/null +++ b/src/utils/watchdog.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief - Keeps track of how long it's been since `Notify` method was + * called. If it wasn't called for a sufficiently long time interval (randomly + * chosen between `min_timeout` and `max_timeout`), the watchdog will + * periodically call `callback` until it is notified or destroyed. + */ +class Watchdog { + public: + Watchdog(const std::chrono::milliseconds &min_timeout, + const std::chrono::milliseconds &max_timeout, + const std::function &callback); + ~Watchdog(); + Watchdog(Watchdog &&) = delete; + Watchdog(const Watchdog &) = delete; + Watchdog &operator=(Watchdog &&) = delete; + Watchdog &operator=(const Watchdog &) = delete; + + void Notify(); + + /** Calling `Block` is equivalent to continuously calling `Notify` + * until `Unblock` is called. + */ + void Block(); + void Unblock(); + + private: + void Run(); + + std::chrono::milliseconds min_timeout_; + std::chrono::milliseconds max_timeout_; + + std::mutex mutex_; + + // Used to generate callback timeouts. + std::mt19937 generator_; + std::uniform_int_distribution distribution_; + std::chrono::steady_clock::time_point callback_threshold_; + + std::function callback_; + + // Used to notify the watchdog loop it should stop. + std::atomic draining_; + std::atomic blocked_; + std::thread thread_; +}; diff --git a/tests/unit/watchdog.cpp b/tests/unit/watchdog.cpp new file mode 100644 index 000000000..0704a317e --- /dev/null +++ b/tests/unit/watchdog.cpp @@ -0,0 +1,59 @@ +#include + +#include "gtest/gtest.h" + +#include "utils/watchdog.hpp" + +using namespace std::chrono_literals; + +TEST(Watchdog, Run) { + std::atomic count(0); + Watchdog dog(200ms, 200ms, [&count]() { ++count; }); + + std::this_thread::sleep_for(250ms); + EXPECT_EQ(count, 1); + + std::this_thread::sleep_for(200ms); + EXPECT_EQ(count, 2); + + std::this_thread::sleep_for(50ms); + dog.Notify(); + + std::this_thread::sleep_for(150ms); + EXPECT_EQ(count, 2); + dog.Notify(); + + std::this_thread::sleep_for(250ms); + EXPECT_EQ(count, 3); +} + +TEST(Watchdog, Blocker) { + std::atomic count(0); + Watchdog dog(200ms, 200ms, [&count]() { ++count; }); + + std::this_thread::sleep_for(250ms); + EXPECT_EQ(count, 1); + + dog.Block(); + + std::this_thread::sleep_for(200ms); + EXPECT_EQ(count, 1); + + std::this_thread::sleep_for(200ms); + EXPECT_EQ(count, 1); + + dog.Unblock(); + + std::this_thread::sleep_for(150ms); + EXPECT_EQ(count, 1); + + std::this_thread::sleep_for(100ms); + EXPECT_EQ(count, 2); + dog.Notify(); + + std::this_thread::sleep_for(100ms); + dog.Unblock(); + + std::this_thread::sleep_for(150ms); + EXPECT_EQ(count, 3); +}