memgraph/include/utils/fswatcher.hpp
2017-01-12 19:42:52 +01:00

433 lines
12 KiB
C++

/**
* Memgraph Ltd
*
* File System Watcher
*
* @author Marko Budiselic
*/
#pragma once
#include <atomic>
#include <chrono>
#include <errno.h>
#include <limits.h>
#include <mutex>
#include <sys/inotify.h>
#include <thread>
#include <unistd.h>
#include <vector>
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
#include "logging/loggable.hpp"
#include "utils/algorithm.hpp"
#include "utils/assert.hpp"
#include "utils/exceptions/basic_exception.hpp"
#include "utils/linux.hpp"
#include "utils/underlying_cast.hpp"
namespace utils
{
using ms = std::chrono::milliseconds;
/**
* watch descriptor type, on linux it is int
*/
using os_wd_t = int;
/**
* machine event mask type, on linux it is inotify mask type which is int
*/
using os_mask_t = int;
// NOTE: for now the types are linux dependent, once when the code base will be
// compiled for another OS figure out the most appropriate solution
/**
* inotify buffer length (10 events)
*/
using in_event_t = struct inotify_event;
constexpr uint64_t IN_HEADER_SIZE = sizeof(struct inotify_event);
/**
* The reason why here is 10 is because the memory space for the data
* has to be upfront reserved (C API). I've picked up 10 because it seems like
* a reasonable size and doesn't have to be configurable before compile or run
* time.
*/
constexpr uint64_t IN_BUFF_LEN = 10 * (IN_HEADER_SIZE + NAME_MAX + 1);
/**
* File System Event Type - abstraction for underlying event types
*/
enum class FSEventType : os_mask_t
{
Created = IN_CREATE,
Modified = IN_MODIFY,
Deleted = IN_DELETE,
All = Created | Modified | Deleted
};
/**
* @struct FSEventBase
*
* Base DTO object.
*
* In derived classes the path can represent eather directory or path to a file.
* In derived classes the type can represent a set of event types or specific
* event.
*/
struct FSEventBase
{
FSEventBase(const fs::path &path, const FSEventType type)
: path(path), type(type)
{
}
fs::path path;
FSEventType type;
operator os_mask_t() { return underlying_cast(type); }
};
/**
* @struct WatchDescriptor
*
* The purpose of this struct is to register new watchers for specific
* directory and event type.
*/
struct WatchDescriptor : public FSEventBase
{
WatchDescriptor(const fs::path &directory, const FSEventType type)
: FSEventBase(directory, type)
{
runtime_assert(fs::is_directory(path),
"The path parameter should be directory");
}
};
/**
* @struct FSEvent
*
* The purpose of this struct is to carry information about a new fs event.
* In this case path is a path to the affected file and type is the event type.
*/
struct FSEvent : public FSEventBase
{
FSEvent(const fs::path &directory, const fs::path &filename,
const FSEventType type)
: FSEventBase(directory / filename, type)
{
}
};
/**
* Custom FSWatcher Exception
*/
class FSWatcherException : public BasicException
{
public:
using BasicException::BasicException;
};
/**
* File System Watcher
*
* The idea is to create wrapper for inotify or any other file system
* notification system.
*
* The object is not thread safe.
*
* parameters:
* * interval - time between two checks for the new file system events
*/
class FSWatcher : public Loggable
{
/**
* callback type (the code that will be notified will be notified
* through callback of this type
*/
using callback_t = std::function<void(FSEvent event)>;
public:
/**
* Initialize underlying notification system.
*/
FSWatcher(ms check_interval = ms(100))
: Loggable("FSWatcher"), check_interval_(check_interval)
{
inotify_fd_ = inotify_init();
if (inotify_fd_ == -1)
throw FSWatcherException("Unable to initialize inotify\n");
linux_os::set_non_blocking(inotify_fd_);
}
~FSWatcher()
{
logger.debug("destructor call");
unwatchAll();
}
/*
* copy and move constructors and assignemnt operators are deleted because
* std::atomic can't be copied or moved
*/
FSWatcher(const FSWatcher &other) = delete;
FSWatcher(FSWatcher &&other) = delete;
FSWatcher &operator=(const FSWatcher &) = delete;
FSWatcher &operator=(FSWatcher &&) = delete;
/**
* Add Watcher
*/
void watch(WatchDescriptor descriptor, callback_t callback)
{
stop();
os_wd_t wd =
inotify_add_watch(inotify_fd_, descriptor.path.c_str(), descriptor);
if (wd == -1)
{
switch (errno)
{
case EACCES:
throw FSWatcherException("Unable to add watcher. Read access "
"to the given file is not permitted.");
case EBADF:
throw FSWatcherException("Unable to add watcher. The given "
"file descriptor is not valid.");
case EFAULT:
throw FSWatcherException("Unable to add watcher. pathname "
"points outside of the process's "
"accessible address space");
case EINVAL:
throw FSWatcherException(
"Unable to add watcher. The given event mask contains no "
"valid events; or fd is not an inotify file descriptor.");
case ENAMETOOLONG:
throw FSWatcherException(
"Unable to add watcher. pathname is too long.");
case ENOENT:
throw FSWatcherException("Unable to add watcher. A directory "
"component in pathname does not exist "
"or is a dangling symbolic link.");
case ENOMEM:
throw FSWatcherException("Unable to add watcher. Insufficient "
"kernel memory was available.");
case ENOSPC:
throw FSWatcherException(
"Unable to add watcher. The user limit on the total number "
"of inotify watches was reached or the kernel failed to "
"allocate a needed resource.");
default:
throw FSWatcherException(
"Unable to add watcher. Unknown Linux API error.");
}
}
// update existing
auto it =
std::find_if(entries_.begin(), entries_.end(),
[wd](Entry &entry) { return wd == entry.os_wd; });
if (it != entries_.end())
{
it->descriptor = descriptor;
it->callback = callback;
}
else
{
entries_.emplace_back(Entry(wd, descriptor, callback));
}
logger.debug("REGISTERED: wd({}) for path({}) and mask ({})", wd,
descriptor.path.c_str(), (os_mask_t)(descriptor));
start();
}
/**
* Remove subscriber on specified path and type.
*
* Time complexity: O(n) (where n is number of entries)
*/
void unwatch(WatchDescriptor descriptor)
{
stop();
auto it = action_remove_if(
entries_.begin(), entries_.end(),
[&descriptor](Entry entry) {
auto stored_descriptor = entry.descriptor;
if (stored_descriptor.path != descriptor.path) return false;
if (stored_descriptor.type != descriptor.type) return false;
return true;
},
[this](Entry entry) { remove_underlaying_watcher(entry); });
if (it != entries_.end()) entries_.erase(it);
if (entries_.size() > 0) start();
}
/**
* Removes all subscribers and stops the watching process.
*/
void unwatchAll()
{
stop();
if (entries_.size() <= 0) return;
entries_.erase(action_remove_if(
entries_.begin(), entries_.end(), [](Entry) { return true; },
[this](Entry &entry) { remove_underlaying_watcher(entry); }));
}
/**
* Start the watching process.
*/
void start()
{
is_running_.store(true);
// run separate thread
dispatch_thread_ = std::thread([this]() {
logger.debug("dispatch thread - start");
while (is_running_.load())
{
std::this_thread::sleep_for(check_interval_);
// read file descriptor and process new events
// the read call should be non blocking otherwise
// this thread can easily be blocked forever
auto n = ::read(inotify_fd_, buffer_, IN_BUFF_LEN);
if (n == 0) throw FSWatcherException("read() -> 0.");
if (n == -1) continue;
logger.info("Read {} bytes from inotify fd", (long)n);
// process all of the events in buffer returned by read()
for (auto p = buffer_; p < buffer_ + n;)
{
// get in_event
auto in_event = reinterpret_cast<in_event_t *>(p);
auto in_event_length = IN_HEADER_SIZE + in_event->len;
// skip if in_event is undefined OR is equal to IN_IGNORED
if ((in_event->len == 0 && in_event->mask == 0) ||
in_event->mask == IN_IGNORED)
{
p += in_event_length;
continue;
}
logger.info("LEN: {}, MASK: {}, NAME: {}", in_event_length,
in_event->mask, in_event->name);
// find our watch descriptor
auto entry = find_if(entries_.begin(), entries_.end(),
[in_event](Entry &entry) {
return entry.os_wd == in_event->wd;
});
auto &descriptor = entry->descriptor;
// call user's callback
entry->callback(
FSEvent(descriptor.path, fs::path(in_event->name),
static_cast<FSEventType>(in_event->mask)));
// move on
p += in_event_length;
}
}
logger.debug("dispatch thread - finish");
});
}
/**
* Stop the watching process.
*/
void stop()
{
if (is_running_.load())
{
is_running_.store(false);
dispatch_thread_.join();
}
}
/**
* @return check interval - time between two underlaying check calls
*/
ms check_interval() const { return check_interval_; }
/**
* @return number of entries
*/
size_t size() const { return entries_.size(); }
private:
/**
* Internal storage for all subscribers.
*
* <os watch descriptor, API watch descriptor, callback>
*/
struct Entry
{
Entry(os_wd_t os_wd, WatchDescriptor descriptor, callback_t callback)
: os_wd(os_wd), descriptor(descriptor), callback(callback)
{
}
os_wd_t os_wd;
WatchDescriptor descriptor;
callback_t callback;
};
/**
* Removes the os specific watch descriptor.
*/
void remove_underlaying_watcher(Entry entry)
{
auto status = inotify_rm_watch(inotify_fd_, entry.os_wd);
if (status == -1)
throw FSWatcherException("Unable to remove underlaying watch.");
else
logger.info("UNREGISTER: fd({}), wd({}), status({})", inotify_fd_,
entry.os_wd, status);
}
/**
* inotify file descriptor
*/
int inotify_fd_;
/**
* running flag, has to be atomic because two threads can update and read
* the value
*/
std::atomic<bool> is_running_;
/**
* interval between the end of events processing and next start of
* processing
*/
ms check_interval_;
/**
*/
std::vector<Entry> entries_;
/**
* thread for events processing
*/
std::thread dispatch_thread_;
/**
* buffer for underlying events (inotify dependent)
*/
char *buffer_[IN_BUFF_LEN];
};
}