Much more serious implementation of FSWatcher

Summary: Much more serious implementation of FSWatcher

Test Plan: ctest -R fswatcher

Reviewers: dtomicevic, sale

Subscribers: buda

Differential Revision: https://phabricator.memgraph.io/D33
This commit is contained in:
Marko Budiselic 2017-01-08 01:03:12 +01:00
parent 782fc05f93
commit 057af7ac14
7 changed files with 604 additions and 127 deletions

View File

@ -188,17 +188,23 @@ if (SYNC_LOGGER)
endif()
# -----------------------------------------------------------------------------
# assert
# custom assert control parameters
# Runtime assert, if value is OFF runtime asserts will be inactive during
# runtime. Default value is ON.
option(RUNTIME_ASSERT "Enable runtime assertions" ON)
message(STATUS "RUNTIME_ASSERT: ${RUNTIME_ASSERT}")
if(RUNTIME_ASSERT)
add_definitions(-DRUNTIME_ASSERT_ON)
endif()
option(THROW_EXCEPTION_ON_ERROR "Throw exception on error" ON)
message(STATUS "THROW_EXCEPTION_ON_ERROR: ${THROW_EXCEPTION_ON_ERROR}")
if(THROW_EXCEPTION_ON_ERROR)
add_definitions(-DTHROW_EXCEPTION_ON_ERROR)
# by default on custom assert only the message, filename and line number will be
# printed on stderr, if STACKTRACE_ASSERT is ON the whole stacktrace is going to
# be printed on stderr
option(STACKTRACE_ASSERT "Dump stacktrace on custom assert" OFF)
message(STATUS "STACKTRACE_ASSERT: ${STACKTRACE_ASSERT}")
if(STACKTRACE_ASSERT)
add_definitions(-DSTACKTRACE_ASSERT_ON)
endif()
# -----------------------------------------------------------------------------

View File

@ -1,10 +1,10 @@
#pragma once
#include <cassert>
#include <fmt/format.h>
#include "logging/log.hpp"
#include "logging/levels.hpp"
#include "utils/assert.hpp"
class Logger
{
@ -54,7 +54,7 @@ public:
template <class Level, class... Args>
void emit(Args&&... args)
{
assert(log != nullptr);
runtime_assert(log != nullptr, "Log object has to be defined.");
auto message = std::make_unique<Message<Level>>(
Timestamp::now(), name, fmt::format(std::forward<Args>(args)...)

View File

@ -0,0 +1,27 @@
#pragma once
#include <algorithm>
#include "logging/default.hpp"
/**
* Goes from first to last item in a container, if an element satisfying the
* predicate then the action is going to be executed and the element is going
* to be shifted to the end of the container.
*
* @tparam ForwardIt type of forward iterator
* @tparam UnaryPredicate type of predicate
* @tparam Action type of action
*
* @return a past-the-end iterator for the new end of the range
*/
template <class ForwardIt, class UnaryPredicate, class Action>
ForwardIt action_remove_if(ForwardIt first, ForwardIt last, UnaryPredicate p,
Action a)
{
auto it = std::remove_if(first, last, p);
if (it == last)
return it;
std::for_each(it, last, a);
return it;
}

View File

@ -1,40 +1,53 @@
/**
* Permanent Assert -> always active
* Runtime Assert -> active only if RUNTIME_ASSERT_ON is present
*/
#pragma once
#include <iostream>
#include <sstream>
#include "utils/exceptions/basic_exception.hpp"
#include "utils/stacktrace/stacktrace.hpp"
// #define THROW_EXCEPTION_ON_ERROR
// #define RUNTIME_ASSERT_ON
/**
* if STACKTRACE_ASSERT_ON is defined the full stacktrace will be printed on
* stderr otherwise just the basic information will be printed out (message,
* file, line)
*/
#ifdef STACKTRACE_ASSERT_ON
#define __handle_assert_message(message) \
Stacktrace stacktrace; \
std::cerr << "ASSERT: " << message << std::endl; \
std::cerr << stacktrace.dump();
#else
#define __handle_assert_message(message) \
std::cerr << "ASSERT: " << message << " In file " << __FILE__ << " #" \
<< __LINE__ << std::endl;
#endif
// // handle assertion error
// void assert_error_handler_(const char *file_name, unsigned line_number,
// const char *message)
// {
// // this is a good place to put your debug breakpoint
// // and add some other destination for error message
// #ifdef THROW_EXCEPTION_ON_ERROR
// throw BasicException(message);
// #else
// std::cerr << message << " in file " << file_name << " #" << line_number
// << std::endl;
// exit(1);
// #endif
// }
// parmanant exception will always be executed
/**
* parmanant assertion will always be active
* when condition is satisfied program has to exit
*
* a good use-case for this type of assert is during unit testing because
* assert has to stay active all the time
*/
#define permanent_assert(condition, message) \
if (!(condition)) \
{ \
std::ostringstream s; \
s << message; \
std::cout << s.str() << std::endl; \
__handle_assert_message(s.str()); \
std::exit(EXIT_FAILURE); \
}
// assert_error_handler_(__FILE__, __LINE__, s.str().c_str());
// runtime exception
/**
* runtime assertion is more like standart C assert but with custom
* define which controls when the assertion will be active
*
* could be used wherever the standard C assert is used but
* the user should not forget about RUNTIME_ASSERT_ON define
*/
#ifdef RUNTIME_ASSERT_ON
#define runtime_assert(condition, message) permanent_assert(condition, message)
#else

View File

@ -1,36 +1,121 @@
/**
* Memgraph Ltd
*
* File System Watcher
*
* @author Marko Budiselic
*/
#pragma once
#include <vector>
#include <atomic>
#include <sys/inotify.h>
#include <chrono>
#include <errno.h>
#include <limits.h>
#include <mutex>
#include <sys/inotify.h>
#include <thread>
// TODO: remove experimental from here once that becomes possible
#include <unistd.h>
#include <vector>
#include <experimental/filesystem>
#include "utils/exceptions/basic_exception.hpp"
#include "utils/exceptions/not_yet_implemented.hpp"
// TODO: remove experimental from here once it becomes possible
namespace fs = std::experimental::filesystem;
#include "logging/loggable.hpp"
#include "utils/algorithm.hpp"
#include "utils/assert.hpp"
#include "utils/exceptions/basic_exception.hpp"
#include "utils/linux.hpp"
#include "utils/underlying_cast.hpp"
namespace utils
{
using ms = std::chrono::milliseconds;
/*
* File System Event Types
/**
* watch descriptor type, on linux it is int
*/
enum class FSEvent : int
using os_wd_t = int;
/**
* machine event mask type, on linux it is inotify mask type which is int
*/
using os_mask_t = int;
// NOTE: for now the types are linux dependent, once when the code base will be
// compiled for another OS figure out the most appropriate solution
/**
* inotify buffer length (10 events)
*/
using in_event_t = struct inotify_event;
constexpr uint64_t IN_HEADER_SIZE = sizeof(struct inotify_event);
constexpr uint64_t IN_BUFF_LEN = 10 * (IN_HEADER_SIZE + NAME_MAX + 1);
/**
* File System Event Type - abstraction for underlying event types
*/
enum class FSEventType : os_mask_t
{
Created = 0x1,
Modified = 0x2,
Deleted = 0x4
Created = IN_CREATE,
Modified = IN_MODIFY,
Deleted = IN_DELETE,
All = Created | Modified | Deleted
};
/*
* Custom exception
/**
* @struct FSEventBase
*
* Base DTO object.
*
* In derived classes the path can represent eather directory or path to a file.
* In derived classes the type can represent a set of event types or specific
* event.
*/
struct FSEventBase
{
FSEventBase(const fs::path &path, const FSEventType type)
: path(path), type(type)
{
}
fs::path path;
FSEventType type;
operator os_mask_t() { return underlying_cast(type); }
};
/**
* @struct WatchDescriptor
*
* The purpose of this struct is to register new watchers for specific
* directory and event type.
*/
struct WatchDescriptor : public FSEventBase
{
WatchDescriptor(const fs::path &directory, const FSEventType type)
: FSEventBase(directory, type)
{
runtime_assert(fs::is_directory(path),
"The path parameter should be directory");
}
};
/**
* @struct FSEvent
*
* The purpose of this struct is to carry information about a new fs event.
* In this case path is a path to the affected file and type is the event type.
*/
struct FSEvent : public FSEventBase
{
FSEvent(const fs::path &directory, const fs::path &filename,
const FSEventType type)
: FSEventBase(directory / filename, type)
{
}
};
/**
* Custom FSWatcher Exception
*/
class FSWatcherException : public BasicException
{
@ -38,129 +123,304 @@ public:
using BasicException::BasicException;
};
/*
/**
* File System Watcher
*
* The idea is to create wrapper of inotify or any other file system
* notificatino system.
* The idea is to create wrapper for inotify or any other file system
* notification system.
*
* The object is not thread safe.
*
* parameters:
* * interval - time between two check for the new file system events
* * interval - time between two checks for the new file system events
*/
class FSWatcher
class FSWatcher : public Loggable
{
// watch descriptor type
using wd_t = int;
// callback type (the code that will be notified will be notified
// through callback of this type
using callback_t = std::function<void(fs::path &path, FSEvent event)>;
// storage type for all subscribers
// <path, watch descriptor, callback>
using entry_t = std::tuple<fs::path, wd_t, callback_t>;
/**
* callback type (the code that will be notified will be notified
* through callback of this type
*/
using callback_t = std::function<void(FSEvent event)>;
public:
FSWatcher(ms interval = ms(100)) : interval_(interval) { init(); }
~FSWatcher() = default;
/**
* Initialize underlying notification system.
*/
FSWatcher(ms check_interval = ms(100))
: Loggable("FSWatcher"), check_interval_(check_interval)
{
inotify_fd_ = inotify_init();
if (inotify_fd_ == -1)
throw FSWatcherException("Unable to initialize inotify\n");
os_linux::set_non_blocking(inotify_fd_);
}
// copy and move constructors and assignemnt operators are deleted because
// std::atomic can't be copied or moved
~FSWatcher()
{
logger.debug("destructor call");
unwatchAll();
}
/*
* copy and move constructors and assignemnt operators are deleted because
* std::atomic can't be copied or moved
*/
FSWatcher(const FSWatcher &other) = delete;
FSWatcher(FSWatcher &&other) = delete;
FSWatcher &operator=(const FSWatcher &) = delete;
FSWatcher &operator=(FSWatcher &&) = delete;
/*
* Initialize file system notification system.
/**
* Add Watcher
*/
void init()
void watch(WatchDescriptor descriptor, callback_t callback)
{
inotify_fd_ = inotify_init();
if (inotify_fd_ == -1)
throw FSWatcherException("Unable to initialize inotify");
}
/*
* Add subscriber
*
* parameters:
* * path: path to a file which will be monitored
* * type: type of events
* * callback: callback that will be called on specified event type
*/
void watch(const fs::path &path, FSEvent, callback_t callback)
{
// TODO: instead IN_ALL_EVENTS pass FSEvent
int wd = inotify_add_watch(inotify_fd_, path.c_str(), IN_ALL_EVENTS);
stop();
os_wd_t wd =
inotify_add_watch(inotify_fd_, descriptor.path.c_str(), descriptor);
if (wd == -1)
throw FSWatcherException("Unable to add watch");
entries_.emplace_back(std::make_tuple(path, wd, callback));
}
/*
* Remove subscriber on specified path and type
*/
void unwatch(const fs::path &path, FSEvent)
{
// iterate through all entries and remove specified watch descriptor
for (auto &entry : entries_)
{
// if paths are not equal pass
if (std::get<fs::path>(entry) != path)
continue;
// get watch descriptor and remove it from the watching
auto status = inotify_rm_watch(inotify_fd_, std::get<wd_t>(entry));
if (status == -1)
throw FSWatcherException("Unable to remove watch");
switch (errno)
{
case EACCES:
throw FSWatcherException("Unable to add watcher. Read access "
"to the given file is not permitted.");
case EBADF:
throw FSWatcherException("Unable to add watcher. The given "
"file descriptor is not valid.");
case EFAULT:
throw FSWatcherException("Unable to add watcher. pathname "
"points outside of the process's "
"accessible address space");
case EINVAL:
throw FSWatcherException(
"Unable to add watcher. The given event mask contains no "
"valid events; or fd is not an inotify file descriptor.");
case ENAMETOOLONG:
throw FSWatcherException(
"Unable to add watcher. pathname is too long.");
case ENOENT:
throw FSWatcherException("Unable to add watcher. A directory "
"component in pathname does not exist "
"or is a dangling symbolic link.");
case ENOMEM:
throw FSWatcherException("Unable to add watcher. Insufficient "
"kernel memory was available.");
case ENOSPC:
throw FSWatcherException(
"Unable to add watcher. The user limit on the total number "
"of inotify watches was reached or the kernel failed to "
"allocate a needed resource.");
default:
throw FSWatcherException(
"Unable to add watcher. Unknown Linux API error.");
}
}
// update existing
auto it =
std::find_if(entries_.begin(), entries_.end(),
[wd](Entry &entry) { return wd == entry.os_wd; });
if (it != entries_.end())
{
it->descriptor = descriptor;
it->callback = callback;
}
else
{
entries_.emplace_back(Entry(wd, descriptor, callback));
}
logger.debug("REGISTERED: wd({}) for path({}) and mask ({})", wd,
descriptor.path.c_str(), (os_mask_t)(descriptor));
start();
}
/*
* Remove all subscribers.
/**
* Remove subscriber on specified path and type.
*
* Time complexity: O(n) (where n is number of entries)
*/
void unwatch(WatchDescriptor descriptor)
{
stop();
auto it = action_remove_if(
entries_.begin(), entries_.end(),
[&descriptor](Entry entry) {
auto stored_descriptor = entry.descriptor;
if (stored_descriptor.path != descriptor.path) return false;
if (stored_descriptor.type != descriptor.type) return false;
return true;
},
[this](Entry entry) { remove_underlaying_watcher(entry); });
if (it != entries_.end()) entries_.erase(it);
if (entries_.size() > 0) start();
}
/**
* Removes all subscribers and stops the watching process.
*/
void unwatchAll()
{
// iterate through all entries and remove all watch descriptors
for (auto &entry : entries_)
{
auto status = inotify_rm_watch(inotify_fd_, std::get<wd_t>(entry));
if (status == -1)
throw FSWatcherException("Unable to remove watch");
}
stop();
if (entries_.size() <= 0) return;
entries_.erase(action_remove_if(
entries_.begin(), entries_.end(), [](Entry) { return true; },
[this](Entry &entry) { remove_underlaying_watcher(entry); }));
}
/*
* Start watching
/**
* Start the watching process.
*/
void start()
{
throw NotYetImplemented("FSWatch::start");
is_running_.store(true);
// run separate thread
dispatch_thread_ = std::thread([this]() {
while (is_running_.load()) {
std::this_thread::sleep_for(interval_);
// TODO implement file system event processing
logger.debug("dispatch thread - start");
while (is_running_.load())
{
std::this_thread::sleep_for(check_interval_);
// read file descriptor and process new events
// the read call should be non blocking otherwise
// this thread can easily be blocked forever
auto n = ::read(inotify_fd_, buffer_, IN_BUFF_LEN);
if (n == 0) throw FSWatcherException("read() -> 0.");
if (n == -1) continue;
logger.info("Read {} bytes from inotify fd", (long)n);
// process all of the events in buffer returned by read()
for (auto p = buffer_; p < buffer_ + n;)
{
// get in_event
auto in_event = reinterpret_cast<in_event_t *>(p);
auto in_event_length = IN_HEADER_SIZE + in_event->len;
// skip if in_event is undefined OR is equal to IN_IGNORED
if ((in_event->len == 0 && in_event->mask == 0) ||
in_event->mask == IN_IGNORED)
{
p += in_event_length;
continue;
}
logger.info("LEN: {}, MASK: {}, NAME: {}", in_event_length,
in_event->mask, in_event->name);
// find our watch descriptor
auto entry = find_if(entries_.begin(), entries_.end(),
[in_event](Entry &entry) {
return entry.os_wd == in_event->wd;
});
auto &descriptor = entry->descriptor;
// call user's callback
entry->callback(
FSEvent(descriptor.path, fs::path(in_event->name),
static_cast<FSEventType>(in_event->mask)));
// move on
p += in_event_length;
}
}
logger.debug("dispatch thread - finish");
});
}
/*
* Stop watching
/**
* Stop the watching process.
*/
void stop()
{
is_running_.store(false);
if (is_running_.load())
{
is_running_.store(false);
dispatch_thread_.join();
}
}
/**
* @return check interval - time between two underlaying check calls
*/
ms check_interval() const { return check_interval_; }
/**
* @return number of entries
*/
size_t size() const { return entries_.size(); }
private:
/**
* Internal storage for all subscribers.
*
* <os watch descriptor, API watch descriptor, callback>
*/
struct Entry
{
Entry(os_wd_t os_wd, WatchDescriptor descriptor, callback_t callback)
: os_wd(os_wd), descriptor(descriptor), callback(callback)
{
}
os_wd_t os_wd;
WatchDescriptor descriptor;
callback_t callback;
};
/**
* Removes the os specific watch descriptor.
*/
void remove_underlaying_watcher(Entry entry)
{
auto status = inotify_rm_watch(inotify_fd_, entry.os_wd);
if (status == -1)
throw FSWatcherException("Unable to remove underlaying watch.");
else
logger.info("UNREGISTER: fd({}), wd({}), status({})", inotify_fd_,
entry.os_wd, status);
}
/**
* inotify file descriptor
*/
int inotify_fd_;
/**
* running flag, has to be atomic because two threads can update and read
* the value
*/
std::atomic<bool> is_running_;
ms interval_;
std::vector<entry_t> entries_;
/**
* interval between the end of events processing and next start of
* processing
*/
ms check_interval_;
/**
*/
std::vector<Entry> entries_;
/**
* thread for events processing
*/
std::thread dispatch_thread_;
/**
* buffer for underlying events (inotify dependent)
*/
char *buffer_[IN_BUFF_LEN];
};
}

54
include/utils/linux.hpp Normal file
View File

@ -0,0 +1,54 @@
#pragma once
// ** Books **
// http://instructor.sdu.edu.kz/~konst/sysprog2015fall/readings/linux%20system%20programming/The%20Linux%20Programming%20Interface-Michael%20Kerrisk.pdf
// ** Documentation **
// http://man7.org/linux/man-pages/man2/read.2.html
// http://man7.org/linux/man-pages/man2/select.2.html
// http://man7.org/linux/man-pages/man2/fcntl.2.html
// ** Community **
// http://stackoverflow.com/questions/5616092/non-blocking-call-for-reading-descriptor
// http://stackoverflow.com/questions/2917881/how-to-implement-a-timeout-in-read-function-call
#include <unistd.h>
#include <fcntl.h>
#include "utils/exceptions/basic_exception.hpp"
#include "utils/exceptions/not_yet_implemented.hpp"
#include "utils/likely.hpp"
namespace os_linux
{
class LinuxException : public BasicException
{
using BasicException::BasicException;
};
/**
* Sets non blocking flag to a file descriptor.
*/
void set_non_blocking(int fd)
{
auto flags = fcntl(fd, F_GETFL, 0);
if(UNLIKELY(flags == -1))
throw LinuxException("Cannot read flags from file descriptor.");
flags |= O_NONBLOCK;
auto status = fcntl(fd, F_SETFL, flags);
if(UNLIKELY(status == -1))
throw LinuxException("Can't set NON_BLOCK flag to file descriptor");
}
/**
* Reads a file descriptor with timeout.
*/
void tread()
{
throw NotYetImplemented();
}
}

View File

@ -1,12 +1,129 @@
#define CATCH_CONFIG_MAIN
#include "catch.hpp"
#include <fstream>
#include <thread>
#include "gtest/gtest.h"
#include "logging/default.cpp"
#include "utils/fswatcher.hpp"
#include "utils/signals/handler.hpp"
#include "utils/stacktrace/log.hpp"
#include "utils/terminate_handler.hpp"
using namespace std::chrono_literals;
using namespace utils;
TEST_CASE("FSWatcher init")
fs::path working_dir = "../data";
fs::path filename = "test.txt";
fs::path test_path = working_dir / filename;
void create_delete_loop(int iterations, ms action_delta)
{
for (int i = 0; i < iterations; ++i)
{
// create test file
std::ofstream outfile(test_path);
outfile.close();
std::this_thread::sleep_for(action_delta);
// remove test file
fs::remove(test_path);
std::this_thread::sleep_for(action_delta);
}
}
void modify_loop(int iterations, ms action_delta)
{
// create test file
std::ofstream outfile(test_path);
outfile.close();
std::this_thread::sleep_for(action_delta);
// append TEST multiple times
for (int i = 0; i < iterations; ++i)
{
outfile.open(test_path, std::ios_base::app);
outfile << "TEST" << i;
outfile.close();
std::this_thread::sleep_for(action_delta);
}
// remove test file
fs::remove(test_path);
std::this_thread::sleep_for(action_delta);
}
TEST(FSWatcherTest, CreateDeleteLoop)
{
FSWatcher watcher;
watcher.init();
// parameters
int iterations = 10;
int created_no = 0;
int deleted_no = 0;
// time distance between two actions should be big enough otherwise
// one event will hide another one
ms action_delta = watcher.check_interval() * 2;
// watchers
watcher.watch(WatchDescriptor(working_dir, FSEventType::Created),
[&](FSEvent) {});
watcher.watch(WatchDescriptor(working_dir, FSEventType::Deleted),
[&](FSEvent) {});
// above watchers should be ignored
watcher.watch(WatchDescriptor(working_dir, FSEventType::All),
[&](FSEvent event) {
if (event.type == FSEventType::Created) created_no++;
if (event.type == FSEventType::Deleted) deleted_no++;
});
ASSERT_EQ(watcher.size(), 1);
create_delete_loop(iterations, action_delta);
ASSERT_EQ(created_no, iterations);
ASSERT_EQ(deleted_no, iterations);
watcher.unwatchAll();
ASSERT_EQ(watcher.size(), 0);
watcher.unwatchAll();
ASSERT_EQ(watcher.size(), 0);
create_delete_loop(iterations, action_delta);
ASSERT_EQ(created_no, iterations);
ASSERT_EQ(deleted_no, iterations);
}
TEST(FSWatcherTest, ModifiyLoop)
{
FSWatcher watcher;
// parameters
int iterations = 10;
int modified_no = 0;
ms action_delta = watcher.check_interval() * 2;
watcher.watch(WatchDescriptor(working_dir, FSEventType::Modified),
[&](FSEvent) { modified_no++; });
ASSERT_EQ(watcher.size(), 1);
modify_loop(iterations, action_delta);
ASSERT_EQ(modified_no, iterations);
watcher.unwatch(WatchDescriptor(working_dir, FSEventType::Modified));
ASSERT_EQ(watcher.size(), 0);
watcher.unwatch(WatchDescriptor(working_dir, FSEventType::Modified));
ASSERT_EQ(watcher.size(), 0);
modify_loop(iterations, action_delta);
ASSERT_EQ(modified_no, iterations);
}
int main(int argc, char **argv)
{
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}