diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index 3fdb20841..abd999b26 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -1,5 +1,6 @@ set(utils_src_files file.cpp + file_locker.cpp memory.cpp signals.cpp thread.cpp diff --git a/src/utils/file_locker.cpp b/src/utils/file_locker.cpp new file mode 100644 index 000000000..2ad635a4f --- /dev/null +++ b/src/utils/file_locker.cpp @@ -0,0 +1,100 @@ +#include "utils/file_locker.hpp" + +namespace utils { + +namespace { +void DeleteFromSystem(const std::filesystem::path &path) { + if (!utils::DeleteFile(path)) { + LOG(WARNING) << "Couldn't delete file " << path << "!"; + } +} +} // namespace + +////// FileRetainer ////// +void FileRetainer::DeleteFile(const std::filesystem::path &path) { + if (active_accessors_.load()) { + files_for_deletion_.WithLock([&](auto &files) { files.emplace(path); }); + return; + } + std::unique_lock guard(main_lock_); + DeleteOrAddToQueue(path); +} + +FileRetainer::FileLocker FileRetainer::AddLocker() { + const size_t current_locker_id = next_locker_id_.fetch_add(1); + lockers_.WithLock([&](auto &lockers) { + lockers.emplace(current_locker_id, std::set{}); + }); + return FileLocker{this, current_locker_id}; +} + +FileRetainer::~FileRetainer() { + CHECK(files_for_deletion_->empty()) << "Files weren't properly deleted"; +} + +[[nodiscard]] bool FileRetainer::FileLocked(const std::filesystem::path &path) { + return lockers_.WithLock([&](auto &lockers) { + for (const auto &[_, paths] : lockers) { + if (paths.count(path)) { + return true; + } + } + return false; + }); +} + +void FileRetainer::DeleteOrAddToQueue(const std::filesystem::path &path) { + if (FileLocked(path)) { + files_for_deletion_.WithLock([&](auto &files) { files.emplace(path); }); + } else { + DeleteFromSystem(path); + } +} + +void FileRetainer::CleanQueue() { + files_for_deletion_.WithLock([&](auto &files) { + for (auto it = files.cbegin(); it != files.cend();) { + if (!FileLocked(*it)) { + DeleteFromSystem(*it); + it = files.erase(it); + } else { + ++it; + } + } + }); +} + +////// FileLocker ////// +FileRetainer::FileLocker::~FileLocker() { + file_retainer_->lockers_.WithLock( + [this](auto &lockers) { lockers.erase(locker_id_); }); + std::unique_lock guard(file_retainer_->main_lock_); + file_retainer_->CleanQueue(); +} + +FileRetainer::FileLockerAccessor FileRetainer::FileLocker::Access() { + return FileLockerAccessor{file_retainer_, locker_id_}; +} + +////// FileLockerAccessor ////// +FileRetainer::FileLockerAccessor::FileLockerAccessor(FileRetainer *retainer, + size_t locker_id) + : file_retainer_{retainer}, + retainer_guard_{retainer->main_lock_}, + locker_id_{locker_id} { + file_retainer_->active_accessors_.fetch_add(1); +} + +bool FileRetainer::FileLockerAccessor::AddFile( + const std::filesystem::path &path) { + if (!std::filesystem::exists(path)) return false; + file_retainer_->lockers_.WithLock( + [&](auto &lockers) { lockers[locker_id_].emplace(path); }); + return true; +} + +FileRetainer::FileLockerAccessor::~FileLockerAccessor() { + file_retainer_->active_accessors_.fetch_sub(1); +} + +} // namespace utils diff --git a/src/utils/file_locker.hpp b/src/utils/file_locker.hpp new file mode 100644 index 000000000..3645a2344 --- /dev/null +++ b/src/utils/file_locker.hpp @@ -0,0 +1,164 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#include "utils/file.hpp" +#include "utils/rw_lock.hpp" +#include "utils/spin_lock.hpp" +#include "utils/synchronized.hpp" + +namespace utils { + +/** + * Helper class used for safer modifying and reading of files + * by preventing a deletion of a file until the file is not used in any of + * currently running threads. + * Also, while a single thread modyifies it's list of locked files, the deletion + * of ALL the files is delayed. + * + * Basic usage of FileRetainer consists of following parts: + * - Defining a global FileRetainer object which is used for locking and + * deleting of the files. + * - Each thread that wants to lock a single or multiple files first creates a + * FileLocker object. + * - Modifying a FileLocker is only possible through the FileLockerAccessor. + * - FileLockerAccessor prevents deletion of any file, so you can safely add + * multiple files to the locker with no risk of having files deleted during + * the process. + * - After a FileLocker or FileLockerAccessor is destroyed, FileRetainer scans + * the list of the files that wait to be deleted, and deletes all the files + * that are not inside any of currently present lockers. + * + * e.g. + * FileRetainer file_retainer; + * std::filesystem::path file1; + * std::filesystem::path file2; + * + * void Foo() { + * // I want to lock a list of files + * // Create a locker + * auto locker = file_retainer.AddLocker(); + * { + * // Create accessor to the locker so you can + * // add the files which need to be locked. + * // Accesor prevents deletion of any files + * // so you safely add multiple files in atomic way + * auto accessor = locker.Access(); + * accessor.AddFile(file1); + * accessor.AddFile(file2); + * } + * // DO SOMETHING WITH THE FILES + * } + * + * void Bar() { + * // I want to delete file1. + * file_retiner.DeleteFile(file1); + * } + * + * int main() { + * // Run Foo() and Bar() in different threads. + * } + * + */ +class FileRetainer { + public: + struct FileLockerAccessor; + + /** + * A single locker inside the FileRetainer that contains a list + * of files that are guarded from deletion. + */ + struct FileLocker { + friend FileRetainer; + ~FileLocker(); + + /** + * Access the FileLocker so you can modify it. + */ + FileLockerAccessor Access(); + + FileLocker(const FileLocker &) = delete; + FileLocker(FileLocker &&) = default; + FileLocker &operator=(const FileLocker &) = delete; + FileLocker &operator=(FileLocker &&) = default; + + private: + explicit FileLocker(FileRetainer *retainer, size_t locker_id) + : file_retainer_{retainer}, locker_id_{locker_id} {} + + FileRetainer *file_retainer_; + size_t locker_id_; + }; + + /** + * Accessor to the FileLocker. + * All the modification to the FileLocker are done + * using this struct. + */ + struct FileLockerAccessor { + friend FileLocker; + + /** + * Add a single file to the current locker. + */ + bool AddFile(const std::filesystem::path &path); + + FileLockerAccessor(const FileLockerAccessor &) = delete; + FileLockerAccessor(FileLockerAccessor &&) = default; + FileLockerAccessor &operator=(const FileLockerAccessor &) = delete; + FileLockerAccessor &operator=(FileLockerAccessor &&) = default; + + ~FileLockerAccessor(); + + private: + explicit FileLockerAccessor(FileRetainer *retainer, size_t locker_id); + + FileRetainer *file_retainer_; + std::shared_lock retainer_guard_; + size_t locker_id_; + }; + + /** + * Delete a file. + * If the file is inside any of the lockers or some thread is modifying + * any of the lockers, the file will be deleted after all the locks are + * lifted. + */ + void DeleteFile(const std::filesystem::path &path); + + /** + * Create and return a new locker. + */ + FileLocker AddLocker(); + + explicit FileRetainer() = default; + FileRetainer(const FileRetainer &) = delete; + FileRetainer(FileRetainer &&) = delete; + FileRetainer &operator=(const FileRetainer &) = delete; + FileRetainer &operator=(FileRetainer &&) = delete; + + ~FileRetainer(); + + private: + [[nodiscard]] bool FileLocked(const std::filesystem::path &path); + void DeleteOrAddToQueue(const std::filesystem::path &path); + void CleanQueue(); + + utils::RWLock main_lock_{RWLock::Priority::WRITE}; + + std::atomic active_accessors_{0}; + std::atomic next_locker_id_{0}; + utils::Synchronized>, + utils::SpinLock> + lockers_; + + utils::Synchronized, utils::SpinLock> + files_for_deletion_; +}; + +} // namespace utils diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 3501686ad..57440cc21 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -195,6 +195,9 @@ target_link_libraries(${test_prefix}skip_list mg-utils) add_unit_test(small_vector.cpp) target_link_libraries(${test_prefix}small_vector mg-utils) +add_unit_test(utils_file_locker.cpp) +target_link_libraries(${test_prefix}utils_file_locker mg-utils fmt) + # Test mg-storage-v2 diff --git a/tests/unit/utils_file_locker.cpp b/tests/unit/utils_file_locker.cpp new file mode 100644 index 000000000..17bdfbba1 --- /dev/null +++ b/tests/unit/utils_file_locker.cpp @@ -0,0 +1,209 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include + +using namespace std::chrono_literals; + +class FileLockerTest : public ::testing::Test { + protected: + std::filesystem::path testing_directory{ + std::filesystem::temp_directory_path() / + "MG_test_unit_utils_file_locker"}; + + void SetUp() override { Clear(); } + + void TearDown() override { Clear(); } + + void CreateFiles(const size_t files_number) { + const auto save_path = std::filesystem::current_path(); + std::filesystem::create_directory(testing_directory); + std::filesystem::current_path(testing_directory); + + for (auto i = 1; i <= files_number; ++i) { + std::ofstream file(fmt::format("{}", i)); + } + + std::filesystem::current_path(save_path); + } + + private: + void Clear() { + if (!std::filesystem::exists(testing_directory)) return; + std::filesystem::remove_all(testing_directory); + } +}; + +TEST_F(FileLockerTest, DeleteWhileLocking) { + CreateFiles(1); + utils::FileRetainer file_retainer; + auto t1 = std::thread([&]() { + auto locker = file_retainer.AddLocker(); + { + auto acc = locker.Access(); + std::this_thread::sleep_for(100ms); + } + }); + const auto file = testing_directory / "1"; + auto t2 = std::thread([&]() { + std::this_thread::sleep_for(50ms); + file_retainer.DeleteFile(file); + ASSERT_TRUE(std::filesystem::exists(file)); + }); + + t1.join(); + t2.join(); + ASSERT_FALSE(std::filesystem::exists(file)); +} + +TEST_F(FileLockerTest, DeleteWhileInLocker) { + CreateFiles(1); + utils::FileRetainer file_retainer; + const auto file = testing_directory / "1"; + auto t1 = std::thread([&]() { + auto locker = file_retainer.AddLocker(); + { + auto acc = locker.Access(); + acc.AddFile(file); + } + std::this_thread::sleep_for(100ms); + }); + + auto t2 = std::thread([&]() { + std::this_thread::sleep_for(50ms); + file_retainer.DeleteFile(file); + ASSERT_TRUE(std::filesystem::exists(file)); + }); + + t1.join(); + t2.join(); + ASSERT_FALSE(std::filesystem::exists(file)); +} + +TEST_F(FileLockerTest, MultipleLockers) { + CreateFiles(3); + utils::FileRetainer file_retainer; + const auto file1 = testing_directory / "1"; + const auto file2 = testing_directory / "2"; + const auto common_file = testing_directory / "3"; + + auto t1 = std::thread([&]() { + auto locker = file_retainer.AddLocker(); + { + auto acc = locker.Access(); + acc.AddFile(file1); + acc.AddFile(common_file); + } + }); + + auto t2 = std::thread([&]() { + auto locker = file_retainer.AddLocker(); + { + auto acc = locker.Access(); + acc.AddFile(file2); + acc.AddFile(common_file); + } + std::this_thread::sleep_for(200ms); + }); + + auto t3 = std::thread([&]() { + std::this_thread::sleep_for(50ms); + file_retainer.DeleteFile(file1); + file_retainer.DeleteFile(file2); + file_retainer.DeleteFile(common_file); + ASSERT_FALSE(std::filesystem::exists(file1)); + ASSERT_TRUE(std::filesystem::exists(file2)); + ASSERT_TRUE(std::filesystem::exists(common_file)); + }); + + t1.join(); + t2.join(); + t3.join(); + ASSERT_FALSE(std::filesystem::exists(file1)); + ASSERT_FALSE(std::filesystem::exists(file2)); + ASSERT_FALSE(std::filesystem::exists(common_file)); +} + +TEST_F(FileLockerTest, MultipleLockersAndDeleters) { + constexpr size_t files_number = 2000; + + CreateFiles(files_number); + // setup random number generator + std::random_device r; + + std::default_random_engine engine(r()); + std::uniform_int_distribution random_short_wait(1, 10); + std::uniform_int_distribution random_wait(1, 100); + std::uniform_int_distribution file_distribution(0, files_number - 1); + + const auto sleep_for = [&](int milliseconds) { + std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); + }; + + const auto random_file = [&]() { + return testing_directory / fmt::format("{}", file_distribution(engine)); + }; + + utils::FileRetainer file_retainer; + + constexpr size_t thread_num = 8; + constexpr size_t file_access_num = 800; + constexpr size_t file_delete_num = 1000; + + std::vector accessor_threads; + accessor_threads.reserve(thread_num); + for (auto i = 0; i < thread_num - 1; ++i) { + accessor_threads.emplace_back([&]() { + sleep_for(random_wait(engine)); + + std::vector locked_files; + auto locker = file_retainer.AddLocker(); + { + auto acc = locker.Access(); + for (auto i = 0; i < file_access_num; ++i) { + auto file = random_file(); + if (acc.AddFile(file)) { + ASSERT_TRUE(std::filesystem::exists(file)); + locked_files.emplace_back(std::move(file)); + } else { + ASSERT_FALSE(std::filesystem::exists(file)); + } + sleep_for(random_short_wait(engine)); + } + } + sleep_for(random_wait(engine)); + for (const auto &file : locked_files) { + ASSERT_TRUE(std::filesystem::exists(file)); + } + }); + } + + std::vector deleted_files; + auto deleter = std::thread([&]() { + sleep_for(random_short_wait(engine)); + for (auto i = 0; i < file_delete_num; ++i) { + auto file = random_file(); + if (std::filesystem::exists(file)) { + file_retainer.DeleteFile(file); + deleted_files.emplace_back(std::move(file)); + } + sleep_for(random_short_wait(engine)); + } + }); + + for (auto &thread : accessor_threads) { + thread.join(); + } + + deleter.join(); + + for (const auto &file : deleted_files) { + ASSERT_FALSE(std::filesystem::exists(file)); + } +}