Remove unused utils

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2636
This commit is contained in:
Matej Ferencevic 2020-01-23 16:06:13 +01:00
parent 4f16b814c7
commit 6d10d90d98
18 changed files with 1 additions and 1228 deletions

View File

@ -16,7 +16,6 @@
#include "query/typed_value.hpp"
#include "storage/v2/id_types.hpp"
#include "utils/bound.hpp"
#include "utils/future.hpp"
#include "utils/hashing/fnv.hpp"
#include "utils/memory.hpp"
#include "utils/visitor.hpp"

View File

@ -1,11 +1,9 @@
set(utils_src_files
demangle.cpp
file.cpp
memory.cpp
signals.cpp
thread.cpp
uuid.cpp
watchdog.cpp)
uuid.cpp)
add_library(mg-utils STATIC ${utils_src_files})
target_link_libraries(mg-utils stdc++fs Threads::Threads fmt glog gflags uuid)

View File

@ -1,18 +0,0 @@
#include "utils/demangle.hpp"
#include <cxxabi.h>
namespace utils {
std::optional<std::string> Demangle(const char *mangled_name) {
int s;
char *type_name = abi::__cxa_demangle(mangled_name, nullptr, nullptr, &s);
std::optional<std::string> ret = std::nullopt;
if (s == 0) {
ret = type_name;
free(type_name);
}
return ret;
}
} // namespace utils

View File

@ -1,17 +0,0 @@
/**
* @file
*/
#pragma once
#include <optional>
#include <string>
namespace utils {
/**
* Converts a mangled name to a human-readable name using abi::__cxa_demangle.
* Returns nullopt if the conversion failed.
*/
std::optional<std::string> Demangle(const char *mangled_name);
} // namespace utils

View File

@ -1,132 +0,0 @@
#pragma once
#include <dlfcn.h>
#include <filesystem>
namespace fs = std::filesystem;
#include <stdexcept>
#include <string>
#include <glog/logging.h>
#include "utils/exceptions.hpp"
namespace utils {
/**
* @brief Exception raised by @c DynamicLib.
*/
class DynamicLibException : public utils::BasicException {
public:
using utils::BasicException::BasicException;
};
/**
* DynamicLib is a wrapper aroung dynamic object returned by dlopen.
*
* Dynamic object must have extern C functions which names should be
* "produce" and "destruct" (that's by convention).
*
* The functions prototypes can be defined with template parameter
* type trait (T).
*
* DynamicLib isn't implemented for concurrent access.
*
* @tparam T type trait which defines the prototypes of extern C functions
* of undelying dynamic object.
*/
template <typename T>
class DynamicLib {
public:
/**
* Initializes dynamic library (loads lib, produce and
* destruct functions)
*
* @param lib_path file system path to dynamic library
*/
DynamicLib(const fs::path &lib_path)
: lib_path(lib_path), lib_object(nullptr) {
// load dynamic lib
// I've added the RTL_DEEPBIND flag when we are opening the dynamic_lib to
// resolve symbols locally instead of globally. For additional information
// take a look at: http://man7.org/linux/man-pages/man3/dlopen.3.html
dynamic_lib = dlopen(lib_path.c_str(), RTLD_NOW | RTLD_DEEPBIND);
if (!dynamic_lib) throw DynamicLibException(dlerror());
dlerror(); /* Clear any existing error */
DLOG(INFO) << "dynamic lib at ADDRESS " << dynamic_lib << " was opened";
// load produce method
this->produce_method =
(typename T::ProducePrototype)dlsym(dynamic_lib, "produce");
const char *dlsym_produce_error = dlerror();
if (dlsym_produce_error) throw DynamicLibException(dlsym_produce_error);
// load destruct method
this->destruct_method =
(typename T::DestructPrototype)dlsym(dynamic_lib, "destruct");
const char *dlsym_destruct_error = dlerror();
if (dlsym_destruct_error) throw DynamicLibException(dlsym_destruct_error);
}
// becuase we are dealing with pointers
// and conceptualy there is no need to copy the instance of this class
// the copy constructor and copy assignment operator are deleted
// the same applies to move methods
DynamicLib(const DynamicLib &other) = delete;
DynamicLib &operator=(const DynamicLib &other) = delete;
DynamicLib(DynamicLib &&other) = delete;
DynamicLib &operator=(DynamicLib &&other) = delete;
/**
* Singleton method. Returns the same instance of underlying class
* for every call. The instance of underlying class is returned by
* extern C produce function.
*
* @return T the instance of lib class
*/
typename T::ObjectPrototype *instance() {
if (lib_object == nullptr) lib_object = this->produce_method();
return lib_object;
}
/**
* Clean underlying object and close the lib
*/
~DynamicLib() {
// first destroy underlying object
if (lib_object != nullptr) {
DLOG(INFO) << "shared object at ADDRESS " << (void *)lib_object
<< " will be destroyed.";
this->destruct_method(lib_object);
}
// then destroy dynamic lib
DLOG(INFO) << "unloading lib " << lib_path.c_str();
if (dynamic_lib != nullptr) {
DLOG(INFO) << "closing dynamic lib ADDRESS " << (void *)dynamic_lib;
// // IMPORTANT: weird problem the program SEGFAULT on dlclose
// // TODO: FIX somehow
// // maybe something similar is:
// //
// http://stackoverflow.com/questions/6450828/segmentation-fault-when-using-dlclose-on-android-platform
// // for now it is not crucial so I've created a task for that
// // ! 0 is success
// Return early because dlclose seems to be casuing the problem again. So
// strange.
return;
int closing_status = dlclose(dynamic_lib);
if (closing_status != 0) throw DynamicLibException(dlerror());
} else {
DLOG(WARNING) << "unload lib was called but lib ptr is null";
}
}
private:
std::string lib_path;
void *dynamic_lib;
typename T::ObjectPrototype *lib_object;
typename T::ProducePrototype produce_method;
typename T::DestructPrototype destruct_method;
};
} // namespace utils

View File

@ -1,472 +0,0 @@
/**
* Memgraph Ltd
*
* File System Watcher
*
* @author Marko Budiselic
*/
#pragma once
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <sys/inotify.h>
#include <unistd.h>
#include <atomic>
#include <chrono>
#include <mutex>
#include <thread>
#include <vector>
#include <filesystem>
namespace fs = std::filesystem;
#include <glog/logging.h>
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
#include "utils/likely.hpp"
#include "utils/underlying_cast.hpp"
namespace utils::linux_os {
void set_non_blocking(int fd) {
auto flags = fcntl(fd, F_GETFL, 0);
if (UNLIKELY(flags == -1))
throw BasicException("Cannot read flags from file descriptor.");
flags |= O_NONBLOCK;
auto status = fcntl(fd, F_SETFL, flags);
if (UNLIKELY(status == -1))
throw BasicException("Can't set NON_BLOCK flag to file descriptor");
}
/**
* Goes from first to last item in a container, if an element satisfying the
* predicate then the action is going to be executed and the element is going
* to be shifted to the end of the container.
*
* @tparam ForwardIt type of forward iterator
* @tparam UnaryPredicate type of predicate
* @tparam Action type of action
*
* @return a past-the-end iterator for the new end of the range
*/
template <class ForwardIt, class UnaryPredicate, class Action>
ForwardIt action_remove_if(ForwardIt first, ForwardIt last, UnaryPredicate p,
Action a) {
auto it = std::remove_if(first, last, p);
if (it == last) return it;
std::for_each(it, last, a);
return it;
}
using ms = std::chrono::milliseconds;
/**
* watch descriptor type, on linux it is int
*/
using os_wd_t = int;
/**
* machine event mask type, on linux it is inotify mask type which is int
*/
using os_mask_t = int;
// NOTE: for now the types are linux dependent, once when the code base will be
// compiled for another OS figure out the most appropriate solution
/**
* inotify buffer length (10 events)
*/
using in_event_t = struct inotify_event;
constexpr uint64_t IN_HEADER_SIZE = sizeof(struct inotify_event);
/**
* The reason why here is 10 is because the memory space for the data
* has to be upfront reserved (C API). I've picked up 10 because it seems like
* a reasonable size and doesn't have to be configurable before compile or run
* time.
*/
constexpr uint64_t IN_BUFF_SLOT_LEN = IN_HEADER_SIZE + NAME_MAX + 1;
constexpr uint64_t IN_BUFF_LEN = 10 * IN_BUFF_SLOT_LEN;
/**
* File System Event Type - abstraction for underlying event types
*/
enum class FSEventType : os_mask_t {
Accessed = IN_ACCESS,
MetadataChanged = IN_ATTRIB,
CloseWrite = IN_CLOSE_WRITE,
CloseNowrite = IN_CLOSE_NOWRITE,
Created = IN_CREATE,
Deleted = IN_DELETE,
DeletedSelf = IN_DELETE_SELF,
Modified = IN_MODIFY,
Renamed = IN_MOVE_SELF,
MovedFrom = IN_MOVED_FROM,
MovedTo = IN_MOVED_TO,
Close = IN_CLOSE,
Opened = IN_OPEN,
Ignored = IN_IGNORED,
All = Accessed | MetadataChanged | CloseWrite | CloseNowrite | Created |
Deleted | DeletedSelf | Modified | Renamed | MovedFrom | MovedTo |
Close | Opened | Ignored
};
inline FSEventType operator|(FSEventType lhs, FSEventType rhs) {
return (FSEventType)(underlying_cast(lhs) | underlying_cast(rhs));
}
/**
* @struct FSEventBase
*
* Base DTO object.
*
* In derived classes the path can represent eather directory or path to a file.
* In derived classes the type can represent a set of event types or specific
* event.
*/
struct FSEventBase {
FSEventBase(const fs::path &path, const FSEventType type)
: path(path), type(type) {}
fs::path path;
FSEventType type;
operator os_mask_t() { return underlying_cast(type); }
};
/**
* @struct WatchDescriptor
*
* The purpose of this struct is to register new watchers for specific
* directory and event type.
*/
struct WatchDescriptor : public FSEventBase {
WatchDescriptor(const fs::path &directory, const FSEventType type)
: FSEventBase(directory, type) {
DCHECK(fs::is_directory(path)) << "The path parameter should be directory";
}
};
/**
* @struct FSEvent
*
* The purpose of this struct is to carry information about a new fs event.
* In this case path is a path to the affected file and type is the event type.
*/
struct FSEvent : public FSEventBase {
FSEvent(const fs::path &directory, const fs::path &filename,
const FSEventType type)
: FSEventBase(directory / filename, type) {}
};
/**
* Custom FSWatcher Exception
*/
class FSWatcherException : public StacktraceException {
public:
using StacktraceException::StacktraceException;
};
/**
* File System Watcher
*
* The idea is to create wrapper for inotify or any other file system
* notification system.
*
* The object is not thread safe.
*
* parameters:
* * interval - time between two checks for the new file system events
*/
class FSWatcher {
/**
* callback type (the code that will be notified will be notified
* through callback of this type
*/
using callback_t = std::function<void(FSEvent event)>;
public:
/**
* Initialize underlying notification system.
*/
explicit FSWatcher(ms check_interval = ms(100))
: check_interval_(check_interval) {
DLOG(INFO) << fmt::format("Inotify header length: {}", IN_HEADER_SIZE);
DLOG(INFO) << fmt::format("Inotify buffer length: {}", IN_BUFF_LEN);
inotify_fd_ = inotify_init();
if (inotify_fd_ == -1)
throw FSWatcherException("Unable to initialize inotify\n");
linux_os::set_non_blocking(inotify_fd_);
}
~FSWatcher() {
DLOG(INFO) << "destructor call";
unwatchAll();
}
/*
* copy and move constructors and assignemnt operators are deleted because
* std::atomic can't be copied or moved
*/
FSWatcher(const FSWatcher &other) = delete;
FSWatcher(FSWatcher &&other) = delete;
FSWatcher &operator=(const FSWatcher &) = delete;
FSWatcher &operator=(FSWatcher &&) = delete;
/**
* Add Watcher
*/
void watch(WatchDescriptor descriptor, callback_t callback) {
stop();
os_wd_t wd =
inotify_add_watch(inotify_fd_, descriptor.path.c_str(), descriptor);
if (wd == -1) {
switch (errno) {
case EACCES:
throw FSWatcherException(
"Unable to add watcher. Read access "
"to the given file is not permitted.");
case EBADF:
throw FSWatcherException(
"Unable to add watcher. The given "
"file descriptor is not valid.");
case EFAULT:
throw FSWatcherException(
"Unable to add watcher. pathname "
"points outside of the process's "
"accessible address space");
case EINVAL:
throw FSWatcherException(
"Unable to add watcher. The given event mask contains no "
"valid events; or fd is not an inotify file descriptor.");
case ENAMETOOLONG:
throw FSWatcherException(
"Unable to add watcher. pathname is too long.");
case ENOENT:
throw FSWatcherException(
"Unable to add watcher. A directory "
"component in pathname does not exist "
"or is a dangling symbolic link.");
case ENOMEM:
throw FSWatcherException(
"Unable to add watcher. Insufficient "
"kernel memory was available.");
case ENOSPC:
throw FSWatcherException(
"Unable to add watcher. The user limit on the total number "
"of inotify watches was reached or the kernel failed to "
"allocate a needed resource.");
default:
throw FSWatcherException(
"Unable to add watcher. Unknown Linux API error.");
}
}
// update existing
auto it = std::find_if(entries_.begin(), entries_.end(),
[wd](Entry &entry) { return wd == entry.os_wd; });
if (it != entries_.end()) {
it->descriptor = descriptor;
it->callback = callback;
} else {
entries_.emplace_back(Entry(wd, descriptor, callback));
}
DLOG(INFO) << fmt::format("REGISTERED: wd({}) for path({}) and mask ({})",
wd, descriptor.path.c_str(),
(os_mask_t)(descriptor));
start();
}
/**
* Remove subscriber on specified path and type.
*
* Time complexity: O(n) (where n is number of entries)
*/
void unwatch(WatchDescriptor descriptor) {
stop();
auto it = action_remove_if(
entries_.begin(), entries_.end(),
[&descriptor](Entry entry) {
auto stored_descriptor = entry.descriptor;
if (stored_descriptor.path != descriptor.path) return false;
if (stored_descriptor.type != descriptor.type) return false;
return true;
},
[this](Entry entry) { remove_underlaying_watcher(entry); });
if (it != entries_.end()) entries_.erase(it);
if (entries_.size() > 0) start();
}
/**
* Removes all subscribers and stops the watching process.
*/
void unwatchAll() {
stop();
if (entries_.size() <= 0) return;
entries_.erase(action_remove_if(
entries_.begin(), entries_.end(), [](Entry) { return true; },
[this](Entry &entry) { remove_underlaying_watcher(entry); }));
}
/**
* Start the watching process.
*/
void start() {
if (is_running_.load()) return;
is_running_.store(true);
// run separate thread
dispatch_thread_ = std::thread([this]() {
DLOG(INFO) << "dispatch thread - start";
while (is_running_.load()) {
std::this_thread::sleep_for(check_interval_);
// read file descriptor and process new events
// the read call should be non blocking otherwise
// this thread can easily be blocked forever
auto n = ::read(inotify_fd_, buffer_, IN_BUFF_LEN);
if (n == 0) throw FSWatcherException("read() -> 0.");
if (n == -1) continue;
DLOG(INFO) << fmt::format("Read {} bytes from inotify fd", (long)n);
// process all of the events in buffer returned by read()
for (auto p = buffer_; p < buffer_ + n;) {
// get in_event
auto in_event = reinterpret_cast<in_event_t *>(p);
auto in_event_length = IN_HEADER_SIZE + in_event->len;
// sometimes inotify system returns event->len that is
// longer then the length of the buffer
// TODO: figure out why (it is not easy)
if (((p - buffer_) + in_event_length) > IN_BUFF_LEN) break;
// here should be an assertion
// DCHECK(in_event_length <= IN_BUFF_SLOT_LEN) <<
// "Inotify event length cannot be bigger
// than "
// "Inotify slot length";
// skip if in_event is undefined OR is equal to IN_IGNORED
if ((in_event->len == 0 && in_event->mask == 0) ||
in_event->mask == IN_IGNORED ||
in_event_length == IN_HEADER_SIZE) // skip empty paths
{
p += in_event_length;
continue;
}
DLOG(INFO) << fmt::format("LEN: {}, MASK: {}, NAME: {}",
in_event_length, in_event->mask,
in_event->name);
// find our watch descriptor
auto entry = find_if(
entries_.begin(), entries_.end(),
[in_event](Entry &entry) { return entry.os_wd == in_event->wd; });
auto &descriptor = entry->descriptor;
// call user's callback
entry->callback(FSEvent(descriptor.path, fs::path(in_event->name),
static_cast<FSEventType>(in_event->mask)));
// move on
p += in_event_length;
}
}
DLOG(INFO) << "dispatch thread - finish";
});
}
/**
* Stop the watching process.
*/
void stop() {
if (is_running_.load()) {
is_running_.store(false);
dispatch_thread_.join();
}
}
/**
* @return check interval - time between two underlaying check calls
*/
ms check_interval() const { return check_interval_; }
/**
* @return number of entries
*/
size_t size() const { return entries_.size(); }
private:
/**
* Internal storage for all subscribers.
*
* <os watch descriptor, API watch descriptor, callback>
*/
struct Entry {
Entry(os_wd_t os_wd, WatchDescriptor descriptor, callback_t callback)
: os_wd(os_wd), descriptor(descriptor), callback(callback) {}
os_wd_t os_wd;
WatchDescriptor descriptor;
callback_t callback;
};
/**
* Removes the os specific watch descriptor.
*/
void remove_underlaying_watcher(Entry entry) {
auto status = inotify_rm_watch(inotify_fd_, entry.os_wd);
if (status == -1)
throw FSWatcherException("Unable to remove underlaying watch.");
else
DLOG(INFO) << fmt::format("UNREGISTER: fd({}), wd({}), status({})",
inotify_fd_, entry.os_wd, status);
}
/**
* inotify file descriptor
*/
int inotify_fd_;
/**
* running flag, has to be atomic because two threads can update and read
* the value
*/
std::atomic<bool> is_running_;
/**
* interval between the end of events processing and next start of
* processing
*/
ms check_interval_;
/**
*/
std::vector<Entry> entries_;
/**
* thread for events processing
*/
std::thread dispatch_thread_;
/**
* buffer for underlying events (inotify dependent)
*/
char *buffer_[IN_BUFF_LEN];
};
} // namespace utils::linux_os

View File

@ -1,46 +0,0 @@
#pragma once
/// @file
#include <future>
namespace utils {
/// Wraps an `std::future` object to ensure that upon destruction the
/// `std::future` is waited on.
template <typename TResult>
class Future {
public:
Future() {}
explicit Future(std::future<TResult> future) : future_(std::move(future)) {}
Future(const Future &) = delete;
Future(Future &&) = default;
Future &operator=(const Future &) = delete;
Future &operator=(Future &&) = default;
~Future() {
if (future_.valid()) future_.wait();
}
/// Returns true if the future has the result available. NOTE: The behaviour
/// is undefined if future isn't valid, i.e. `future.valid() == false`.
bool IsReady() const {
auto status = future_.wait_for(std::chrono::seconds(0));
return status == std::future_status::ready;
}
auto get() { return future_.get(); }
auto wait() { return future_.wait(); }
auto valid() { return future_.valid(); }
private:
std::future<TResult> future_;
};
/// Creates a `Future` from the given `std::future`.
template <typename TResult>
Future<TResult> make_future(std::future<TResult> future) {
return Future<TResult>(std::move(future));
}
} // namespace utils

View File

@ -1,95 +0,0 @@
#pragma once
#include <chrono>
#include <ratio>
#include <utility>
namespace utils {
using ms = std::chrono::milliseconds;
/**
* Casts the time difference into DurationUnit (default values is
* std::chrono::nanoseconds).
*
* @tparam DurationUnit type of return value
* @param delta time difference
* @return time difference in DurationUnit
*/
template <typename DurationUnit = std::chrono::nanoseconds>
auto to_duration(const std::chrono::duration<long, std::nano> &delta) {
return std::chrono::duration_cast<DurationUnit>(delta).count();
}
/**
* Measures time for the function call.
*
* @tparam DurationUnit unit of returned value
* @param func function to execute
* @param args function arguments
* @return duration of a function call
*/
template <typename DurationUnit, typename F, typename... Args>
auto measure_time(F func, Args &&... args) {
auto start_time = std::chrono::high_resolution_clock::now();
func(std::forward<Args>(args)...);
return to_duration<DurationUnit>(std::chrono::high_resolution_clock::now() -
start_time);
}
/**
* @brief Stopwatch
*
* idea from Optimized C++ Kurt Guntheroth 2016
*
* The instance of this class can return delta time from a single start point.
* The start point in time is stored within the object.
*/
class Stopwatch {
public:
/**
* Initializes start point to system clock min.
*/
Stopwatch() : start_(std::chrono::system_clock::time_point::min()) {}
/**
* Set start point to system clock min.
*/
void Clear() { start_ = std::chrono::system_clock::time_point::min(); }
/**
* If start isn't system clock min than the Stopwatch has been started.
*
* @return bool is the Stopwatch started.
*/
auto IsStarted() const {
return (start_ != std::chrono::system_clock::time_point::min());
}
/**
* Set start point to the current time.
*/
void Start() { start_ = std::chrono::system_clock::now(); }
/**
* @return elapsed time in milliseconds
* if the Stopwatch isn't active returns 0
*/
auto GetMs() {
if (IsStarted()) {
std::chrono::system_clock::duration diff;
diff = std::chrono::system_clock::now() - start_;
return std::chrono::duration_cast<std::chrono::milliseconds>(diff);
}
return std::chrono::milliseconds(0);
}
/**
* Empty destructor.
*/
~Stopwatch() {}
private:
std::chrono::system_clock::time_point start_;
};
}

View File

@ -1,81 +0,0 @@
#include <random>
#include <set>
#include <vector>
namespace utils::random {
template <class Distribution, class Generator>
class RandomGenerator {
private:
std::random_device device_;
protected:
Generator gen_;
Distribution dist_;
public:
RandomGenerator(Distribution dist) : gen_(device_()), dist_(dist) {}
};
class StringGenerator
: public RandomGenerator<std::uniform_int_distribution<int>,
std::default_random_engine> {
private:
int size_;
public:
explicit StringGenerator(int size)
: RandomGenerator(std::uniform_int_distribution<int>(32, 126)),
size_(size) {}
std::string next(int size) {
std::string random_string;
random_string.reserve(size);
for (int i = 0; i < size; i++) random_string += (dist_(gen_) + '\0');
return random_string;
}
std::string next() { return next(size_); }
};
template <class Distribution, class Generator, class DistributionRangeType>
class NumberGenerator : public RandomGenerator<Distribution, Generator> {
public:
NumberGenerator(DistributionRangeType start, DistributionRangeType end)
: RandomGenerator<Distribution, Generator>(Distribution(start, end)) {}
auto next() { return this->dist_(this->gen_); }
};
template <class FirstGenerator, class SecondGenerator>
class PairGenerator {
private:
FirstGenerator *first_;
SecondGenerator *second_;
public:
PairGenerator(FirstGenerator *first, SecondGenerator *second)
: first_(first), second_(second) {}
auto next() { return std::make_pair(first_->next(), second_->next()); }
};
template <class RandomGenerator>
auto generate_vector(RandomGenerator &gen, int size) {
std::vector<decltype(gen.next())> elements(size);
for (int i = 0; i < size; i++) elements[i] = gen.next();
return elements;
}
// IMPORTANT
// be careful with RandomGenerator ranges and set size
// condition must be valid: size(set) << range(RandomGenerator)
template <class RandomGenerator>
auto generate_set(RandomGenerator &gen, int size) {
std::set<decltype(gen.next())> elements;
while (elements.size() < size) elements.insert(gen.next());
return elements;
}
} // namespace utils::random

View File

@ -2,7 +2,6 @@
#include <sys/prctl.h>
#include <fmt/format.h>
#include <glog/logging.h>
namespace utils {
@ -13,32 +12,4 @@ void ThreadSetName(const std::string &name) {
<< "Couldn't set thread name: " << name << "!";
}
ThreadPool::ThreadPool(size_t threads, const std::string &name) {
for (size_t i = 0; i < threads; ++i)
workers_.emplace_back([this, name, i] {
ThreadSetName(fmt::format("{} {}", name, i + 1));
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cvar_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
ThreadPool::~ThreadPool() {
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
lock.unlock();
cvar_.notify_all();
for (std::thread &worker : workers_) {
if (worker.joinable()) worker.join();
}
}
} // namespace utils

View File

@ -1,19 +1,7 @@
/// @file
#pragma once
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
#include <glog/logging.h>
#include "utils/future.hpp"
namespace utils {
@ -21,44 +9,4 @@ namespace utils {
/// Beware, the name length limit is 16 characters!
void ThreadSetName(const std::string &name);
/// A thread pool for asynchronous task execution. Supports tasks that produce
/// return values by returning `utils::Future` objects.
class ThreadPool final {
public:
/// Creates a thread pool with the given number of threads.
ThreadPool(size_t threads, const std::string &name);
~ThreadPool();
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
/// Runs the given callable with the given args, asynchronously. This function
/// immediately returns an `utils::Future` with the result, to be
/// consumed when ready.
template <class TCallable, class... TArgs>
auto Run(TCallable &&callable, TArgs &&... args) {
auto task = std::make_shared<
std::packaged_task<std::result_of_t<TCallable(TArgs...)>()>>(std::bind(
std::forward<TCallable>(callable), std::forward<TArgs>(args)...));
auto res = utils::make_future(task->get_future());
std::unique_lock<std::mutex> lock(mutex_);
CHECK(!stop_) << "ThreadPool::Run called on stopped ThreadPool.";
tasks_.emplace([task]() { (*task)(); });
lock.unlock();
cvar_.notify_one();
return res;
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable cvar_;
bool stop_{false};
};
}; // namespace utils

View File

@ -1,71 +0,0 @@
#include "watchdog.hpp"
#include <atomic>
#include <chrono>
#include <functional>
#include <random>
#include <thread>
#include "glog/logging.h"
using std::chrono::milliseconds;
using std::chrono::steady_clock;
namespace utils {
Watchdog::Watchdog(const milliseconds &min_timeout,
const milliseconds &max_timeout,
const std::function<void()> &callback, bool blocked)
: min_timeout_(min_timeout),
max_timeout_(max_timeout),
generator_(std::random_device{}()),
distribution_(min_timeout.count(), max_timeout_.count()),
callback_(callback),
draining_(false),
blocked_(blocked) {
DCHECK(min_timeout_ <= max_timeout_)
<< "Min timeout should be less than max timeout";
Notify();
thread_ = std::thread([this]() { Run(); });
}
Watchdog::~Watchdog() {
draining_ = true;
if (thread_.joinable()) {
thread_.join();
}
}
void Watchdog::Notify() {
std::lock_guard<std::mutex> guard(mutex_);
callback_threshold_ =
steady_clock::now() + milliseconds(distribution_(generator_));
}
void Watchdog::Block() { blocked_ = true; }
void Watchdog::Unblock() {
if (blocked_) {
Notify();
}
blocked_ = false;
}
void Watchdog::Run() {
while (!draining_) {
steady_clock::time_point t;
{
std::lock_guard<std::mutex> guard(mutex_);
t = callback_threshold_;
}
if (steady_clock::now() > t) {
if (!blocked_) {
callback_();
}
Notify();
}
std::this_thread::sleep_until(t);
}
}
} // namespace utils

View File

@ -1,60 +0,0 @@
#pragma once
#include <atomic>
#include <cassert>
#include <chrono>
#include <functional>
#include <mutex>
#include <random>
#include <thread>
namespace utils {
/**
* @brief - Keeps track of how long it's been since `Notify` method was called.
* If it wasn't called for a sufficiently long time interval (randomly chosen
* between `min_timeout` and `max_timeout`), the watchdog will periodically call
* `callback` until it is notified or destroyed. If `blocked` is set to true,
* watchdog will be blocked on startup.
*/
class Watchdog {
public:
Watchdog(const std::chrono::milliseconds &min_timeout,
const std::chrono::milliseconds &max_timeout,
const std::function<void()> &callback, bool blocked = false);
~Watchdog();
Watchdog(Watchdog &&) = delete;
Watchdog(const Watchdog &) = delete;
Watchdog &operator=(Watchdog &&) = delete;
Watchdog &operator=(const Watchdog &) = delete;
void Notify();
/** Calling `Block` is equivalent to continuously calling `Notify`
* until `Unblock` is called.
*/
void Block();
void Unblock();
private:
void Run();
std::chrono::milliseconds min_timeout_;
std::chrono::milliseconds max_timeout_;
std::mutex mutex_;
// Used to generate callback timeouts.
std::mt19937 generator_;
std::uniform_int_distribution<int> distribution_;
std::chrono::steady_clock::time_point callback_threshold_;
std::function<void()> callback_;
// Used to notify the watchdog loop it should stop.
std::atomic<bool> draining_;
std::atomic<bool> blocked_;
std::thread thread_;
};
} // namespace utils

View File

@ -55,7 +55,6 @@ std::optional<communication::rpc::Server> server;
std::optional<communication::ClientContext> client_context;
std::optional<communication::rpc::Client> clients[kThreadsNum];
std::optional<communication::rpc::ClientPool> client_pool;
std::optional<utils::ThreadPool> thread_pool;
static void BenchmarkRpc(benchmark::State &state) {
std::string data(state.range(0), 'a');
@ -73,15 +72,6 @@ static void BenchmarkRpcPool(benchmark::State &state) {
state.SetItemsProcessed(state.iterations());
}
static void BenchmarkRpcPoolAsync(benchmark::State &state) {
std::string data(state.range(0), 'a');
while (state.KeepRunning()) {
auto future = thread_pool->Run([&data] { client_pool->Call<Echo>(data); });
future.get();
}
state.SetItemsProcessed(state.iterations());
}
BENCHMARK(BenchmarkRpc)
->RangeMultiplier(4)
->Range(4, 1 << 13)
@ -96,13 +86,6 @@ BENCHMARK(BenchmarkRpcPool)
->Unit(benchmark::kNanosecond)
->UseRealTime();
BENCHMARK(BenchmarkRpcPoolAsync)
->RangeMultiplier(4)
->Range(4, 1 << 13)
->ThreadRange(1, kThreadsNum)
->Unit(benchmark::kNanosecond)
->UseRealTime();
int main(int argc, char **argv) {
::benchmark::Initialize(&argc, argv);
gflags::AllowCommandLineReparsing();
@ -160,8 +143,6 @@ int main(int argc, char **argv) {
threads[i].join();
}
thread_pool.emplace(kThreadsNum, "RPC client");
std::this_thread::sleep_for(std::chrono::milliseconds(200));
::benchmark::RunSpecifiedBenchmarks();

View File

@ -175,9 +175,6 @@ target_link_libraries(${test_prefix}socket mg-io)
add_unit_test(utils_algorithm.cpp)
target_link_libraries(${test_prefix}utils_algorithm mg-utils)
add_unit_test(utils_demangle.cpp)
target_link_libraries(${test_prefix}utils_demangle mg-utils)
add_unit_test(utils_exceptions.cpp)
target_link_libraries(${test_prefix}utils_exceptions mg-utils)
@ -211,15 +208,9 @@ target_link_libraries(${test_prefix}utils_string mg-utils)
add_unit_test(utils_synchronized.cpp)
target_link_libraries(${test_prefix}utils_synchronized mg-utils)
add_unit_test(utils_thread_pool.cpp)
target_link_libraries(${test_prefix}utils_thread_pool mg-utils)
add_unit_test(utils_timestamp.cpp)
target_link_libraries(${test_prefix}utils_timestamp mg-utils)
add_unit_test(utils_watchdog.cpp)
target_link_libraries(${test_prefix}utils_watchdog mg-utils)
# Test mg-auth
add_unit_test(auth.cpp)

View File

@ -1,23 +0,0 @@
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "utils/demangle.hpp"
using utils::Demangle;
struct DummyStruct {};
template <typename T>
class DummyClass {};
TEST(Demangle, Demangle) {
int x;
char *s;
DummyStruct t;
DummyClass<int> c;
EXPECT_EQ(*Demangle(typeid(x).name()), "int");
EXPECT_EQ(*Demangle(typeid(s).name()), "char*");
EXPECT_EQ(*Demangle(typeid(t).name()), "DummyStruct");
EXPECT_EQ(*Demangle(typeid(c).name()), "DummyClass<int>");
}

View File

@ -1,41 +0,0 @@
#include <chrono>
#include <memory>
#include <unordered_set>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "utils/future.hpp"
#include "utils/thread.hpp"
#include "utils/timer.hpp"
TEST(ThreadPool, RunMany) {
utils::ThreadPool tp(10, "Test");
const int kResults = 10000;
std::vector<utils::Future<int>> results;
for (int i = 0; i < kResults; ++i) {
results.emplace_back(tp.Run([i]() { return i; }));
}
std::unordered_set<int> result_set;
for (auto &result : results) result_set.insert(result.get());
EXPECT_EQ(result_set.size(), kResults);
}
TEST(ThreadPool, EnsureParallel) {
using namespace std::chrono_literals;
const int kSize = 10;
utils::ThreadPool tp(kSize, "Test");
std::vector<utils::Future<void>> results;
utils::Timer t;
for (int i = 0; i < kSize; ++i) {
results.emplace_back(tp.Run([]() { std::this_thread::sleep_for(50ms); }));
}
for (auto &res : results) res.wait();
auto elapsed = t.Elapsed();
EXPECT_GE(elapsed, 30ms);
EXPECT_LE(elapsed, 200ms);
}

View File

@ -1,59 +0,0 @@
#include <atomic>
#include "gtest/gtest.h"
#include "utils/watchdog.hpp"
using namespace std::chrono_literals;
TEST(Watchdog, Run) {
std::atomic<int> count(0);
utils::Watchdog dog(200ms, 200ms, [&count]() { ++count; });
std::this_thread::sleep_for(250ms);
EXPECT_EQ(count, 1);
std::this_thread::sleep_for(200ms);
EXPECT_EQ(count, 2);
std::this_thread::sleep_for(50ms);
dog.Notify();
std::this_thread::sleep_for(150ms);
EXPECT_EQ(count, 2);
dog.Notify();
std::this_thread::sleep_for(250ms);
EXPECT_EQ(count, 3);
}
TEST(Watchdog, Blocker) {
std::atomic<int> count(0);
utils::Watchdog dog(200ms, 200ms, [&count]() { ++count; });
std::this_thread::sleep_for(250ms);
EXPECT_EQ(count, 1);
dog.Block();
std::this_thread::sleep_for(200ms);
EXPECT_EQ(count, 1);
std::this_thread::sleep_for(200ms);
EXPECT_EQ(count, 1);
dog.Unblock();
std::this_thread::sleep_for(150ms);
EXPECT_EQ(count, 1);
std::this_thread::sleep_for(100ms);
EXPECT_EQ(count, 2);
dog.Notify();
std::this_thread::sleep_for(100ms);
dog.Unblock();
std::this_thread::sleep_for(150ms);
EXPECT_EQ(count, 3);
}