Add utils::InputFile wrapper

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2377
This commit is contained in:
Matej Ferencevic 2019-09-12 14:49:45 +02:00
parent b74141aafe
commit 51fc4df87a
5 changed files with 339 additions and 59 deletions

View File

@ -91,7 +91,8 @@ void Log::ReopenLog() {
if (!started_.load(std::memory_order_relaxed)) return;
std::lock_guard<std::mutex> guard(lock_);
if (log_.IsOpen()) log_.Close();
log_.Open(storage_directory_ / "audit.log");
log_.Open(storage_directory_ / "audit.log",
utils::OutputFile::Mode::APPEND_TO_EXISTING);
}
void Log::Flush() {

View File

@ -60,7 +60,7 @@ class Log {
std::optional<RingBuffer<Item>> buffer_;
utils::Scheduler scheduler_;
utils::LogFile log_;
utils::OutputFile log_;
std::mutex lock_;
};

View File

@ -6,6 +6,7 @@
#include <unistd.h>
#include <fstream>
#include <type_traits>
#include <glog/logging.h>
@ -50,34 +51,182 @@ bool CopyFile(const std::filesystem::path &src,
return std::filesystem::copy_file(src, dst, error_code);
}
LogFile::~LogFile() {
static_assert(std::is_same_v<off_t, ssize_t>, "off_t must fit into ssize_t!");
InputFile::~InputFile() {
if (IsOpen()) Close();
}
LogFile::LogFile(LogFile &&other)
: fd_(other.fd_),
written_since_last_sync_(other.written_since_last_sync_),
path_(other.path_) {
InputFile::InputFile(InputFile &&other) noexcept
: fd_(other.fd_), path_(std::move(other.path_)) {
other.fd_ = -1;
other.written_since_last_sync_ = 0;
other.path_ = "";
}
LogFile &LogFile::operator=(LogFile &&other) {
InputFile &InputFile::operator=(InputFile &&other) noexcept {
if (IsOpen()) Close();
fd_ = other.fd_;
written_since_last_sync_ = other.written_since_last_sync_;
path_ = other.path_;
path_ = std::move(other.path_);
other.fd_ = -1;
other.written_since_last_sync_ = 0;
other.path_ = "";
return *this;
}
void LogFile::Open(const std::filesystem::path &path) {
void InputFile::Open(const std::filesystem::path &path) {
CHECK(!IsOpen())
<< "While trying to open " << path
<< " for writing the database used a handle that already has " << path_
<< " opened in it!";
path_ = path;
while (true) {
fd_ = open(path_.c_str(), O_RDONLY | O_CLOEXEC);
if (fd_ == -1 && errno == EINTR) {
// The call was interrupted, try again...
continue;
} else {
// All other possible errors are fatal errors and are handled in the CHECK
// below.
break;
}
}
CHECK(fd_ != -1) << "While trying to open " << path_
<< " for reading an error occurred: " << strerror(errno)
<< " (" << errno << ").";
}
bool InputFile::IsOpen() const { return fd_ != -1; }
const std::filesystem::path &InputFile::path() const { return path_; }
bool InputFile::Read(uint8_t *data, size_t size) {
size_t offset = 0;
while (size > 0) {
auto got = read(fd_, data + offset, size);
if (got == -1 && errno == EINTR) {
continue;
}
if (got <= 0) {
return false;
}
size -= got;
offset += got;
}
return true;
}
bool InputFile::Peek(uint8_t *data, size_t size) {
size_t offset = 0;
while (size > 0) {
auto got = read(fd_, data + offset, size);
if (got == -1 && errno == EINTR) {
continue;
}
if (got <= 0) {
SetPosition(Position::RELATIVE_TO_CURRENT, -offset);
return false;
}
size -= got;
offset += got;
}
SetPosition(Position::RELATIVE_TO_CURRENT, -offset);
return true;
}
size_t InputFile::GetSize() {
size_t current = GetPosition();
size_t size = SetPosition(Position::RELATIVE_TO_END, 0);
SetPosition(Position::SET, current);
return size;
}
size_t InputFile::GetPosition() {
return SetPosition(Position::RELATIVE_TO_CURRENT, 0);
}
size_t InputFile::SetPosition(Position position, ssize_t offset) {
int whence;
switch (position) {
case Position::SET:
whence = SEEK_SET;
break;
case Position::RELATIVE_TO_CURRENT:
whence = SEEK_CUR;
break;
case Position::RELATIVE_TO_END:
whence = SEEK_END;
break;
}
while (true) {
auto pos = lseek(fd_, offset, whence);
if (pos == -1 && errno == EINTR) {
continue;
}
CHECK(pos >= 0) << "While trying to set the position in " << path_
<< " an error occurred: " << strerror(errno) << " ("
<< errno << ").";
return pos;
}
}
void InputFile::Close() noexcept {
int ret = 0;
while (true) {
ret = close(fd_);
if (ret == -1 && errno == EINTR) {
// The call was interrupted, try again...
continue;
} else {
// All other possible errors are fatal errors and are handled in the CHECK
// below.
break;
}
}
CHECK(ret == 0) << "While trying to close " << path_
<< " an error occurred: " << strerror(errno) << " (" << errno
<< ").";
fd_ = -1;
}
OutputFile::~OutputFile() {
if (IsOpen()) Close();
}
OutputFile::OutputFile(OutputFile &&other) noexcept
: fd_(other.fd_),
written_since_last_sync_(other.written_since_last_sync_),
path_(std::move(other.path_)) {
other.fd_ = -1;
other.written_since_last_sync_ = 0;
}
OutputFile &OutputFile::operator=(OutputFile &&other) noexcept {
if (IsOpen()) Close();
fd_ = other.fd_;
written_since_last_sync_ = other.written_since_last_sync_;
path_ = std::move(other.path_);
other.fd_ = -1;
other.written_since_last_sync_ = 0;
return *this;
}
void OutputFile::Open(const std::filesystem::path &path, Mode mode) {
CHECK(!IsOpen())
<< "While trying to open " << path
<< " for writing the database used a handle that already has " << path_
@ -86,9 +235,12 @@ void LogFile::Open(const std::filesystem::path &path) {
path_ = path;
written_since_last_sync_ = 0;
int flags = O_WRONLY | O_CLOEXEC | O_CREAT;
if (mode == Mode::APPEND_TO_EXISTING) flags |= O_APPEND;
while (true) {
// The permissions are set to ((rw-r-----) & ~umask)
fd_ = open(path_.c_str(), O_WRONLY | O_CLOEXEC | O_CREAT | O_APPEND, 0640);
fd_ = open(path_.c_str(), flags, 0640);
if (fd_ == -1 && errno == EINTR) {
// The call was interrupted, try again...
continue;
@ -104,11 +256,11 @@ void LogFile::Open(const std::filesystem::path &path) {
<< " (" << errno << ").";
}
bool LogFile::IsOpen() const { return fd_ != -1; }
bool OutputFile::IsOpen() const { return fd_ != -1; }
const std::filesystem::path &LogFile::path() const { return path_; }
const std::filesystem::path &OutputFile::path() const { return path_; }
void LogFile::Write(const char *data, size_t size) {
void OutputFile::Write(const char *data, size_t size) {
while (size > 0) {
auto written = write(fd_, data, size);
if (written == -1 && errno == EINTR) {
@ -128,14 +280,43 @@ void LogFile::Write(const char *data, size_t size) {
}
}
void LogFile::Write(const uint8_t *data, size_t size) {
void OutputFile::Write(const uint8_t *data, size_t size) {
Write(reinterpret_cast<const char *>(data), size);
}
void LogFile::Write(const std::string &data) {
void OutputFile::Write(const std::string_view &data) {
Write(data.data(), data.size());
}
void LogFile::Sync() {
size_t OutputFile::GetPosition() {
return SetPosition(Position::RELATIVE_TO_CURRENT, 0);
}
size_t OutputFile::SetPosition(Position position, ssize_t offset) {
int whence;
switch (position) {
case Position::SET:
whence = SEEK_SET;
break;
case Position::RELATIVE_TO_CURRENT:
whence = SEEK_CUR;
break;
case Position::RELATIVE_TO_END:
whence = SEEK_END;
break;
}
while (true) {
auto pos = lseek(fd_, offset, whence);
if (pos == -1 && errno == EINTR) {
continue;
}
CHECK(pos >= 0) << "While trying to set the position in " << path_
<< " an error occurred: " << strerror(errno) << " ("
<< errno << ").";
return pos;
}
}
void OutputFile::Sync() {
int ret = 0;
while (true) {
ret = fsync(fd_);
@ -181,7 +362,7 @@ void LogFile::Sync() {
written_since_last_sync_ = 0;
}
void LogFile::Close() {
void OutputFile::Close() noexcept {
int ret = 0;
while (true) {
ret = close(fd_);
@ -202,7 +383,6 @@ void LogFile::Close() {
fd_ = -1;
written_since_last_sync_ = 0;
path_ = "";
}
} // namespace utils

View File

@ -1,13 +1,14 @@
/**
* @file
*
* This file contains utilities for operations with files. Other than utility
* functions, a `File` class is provided which emulates a `fstream`.
* This file contains utilities for operations with files.
* `InputFile` and `OutputFile` classes are provided which emulate a `fstream`.
*/
#pragma once
#include <filesystem>
#include <string>
#include <string_view>
#include <vector>
namespace utils {
@ -32,6 +33,73 @@ bool DeleteDir(const std::filesystem::path &dir) noexcept;
bool CopyFile(const std::filesystem::path &src,
const std::filesystem::path &dst) noexcept;
/// This class implements a file handler that is used to read binary files. It
/// was developed because the C++ standard library has an awful API and makes
/// handling of binary data extremely tedious.
///
/// This class *isn't* thread safe. It is implemented as a wrapper around low
/// level system calls used for file manipulation.
class InputFile {
public:
enum class Position {
SET,
RELATIVE_TO_CURRENT,
RELATIVE_TO_END,
};
InputFile() = default;
~InputFile();
InputFile(const InputFile &) = delete;
InputFile &operator=(const InputFile &) = delete;
InputFile(InputFile &&other) noexcept;
InputFile &operator=(InputFile &&other) noexcept;
/// This method opens the file used for reading. If the file can't be opened
/// or doesn't exist it crashes the program.
void Open(const std::filesystem::path &path);
/// Returns a boolean indicating whether a file is opened.
bool IsOpen() const;
/// Returns the path to the currently opened file. If a file isn't opened the
/// path is empty.
const std::filesystem::path &path() const;
/// Reads `size` bytes from the file into the memory pointed by `data` and
/// returns a boolean indicating whether the read succeeded. Reading the file
/// changes the current position in the file.
bool Read(uint8_t *data, size_t size);
/// Peeks `size` bytes from the file into the memory pointed by `data` and
/// returns a boolean indicating whether the peek succeeded. Peeking the file
/// doesn't change the current position in the file.
bool Peek(uint8_t *data, size_t size);
/// This method gets the size of the file. On failure and misuse it crashes
/// the program.
size_t GetSize();
/// This method gets the current absolute position in the file. On failure and
/// misuse it crashes the program.
size_t GetPosition();
/// This method sets the current position in the file and returns the absolute
/// set position in the file. The position is set to `offset` with the
/// starting point taken from `position`. On failure and misuse it crashes the
/// program.
size_t SetPosition(Position position, ssize_t offset);
/// Closes the currently opened file. On failure and misuse it crashes the
/// program.
void Close() noexcept;
private:
int fd_{-1};
std::filesystem::path path_;
};
/// This class implements a file handler that is used for mission critical files
/// that need to be written and synced to permanent storage. Typical usage for
/// this class is in implementation of write-ahead logging or anything similar
@ -54,22 +122,34 @@ bool CopyFile(const std::filesystem::path &src,
///
/// This class *isn't* thread safe. It is implemented as a wrapper around low
/// level system calls used for file manipulation.
class LogFile {
class OutputFile {
public:
LogFile() = default;
~LogFile();
enum class Mode {
OVERWRITE_EXISTING,
APPEND_TO_EXISTING,
};
LogFile(const LogFile &) = delete;
LogFile &operator=(const LogFile &) = delete;
enum class Position {
SET,
RELATIVE_TO_CURRENT,
RELATIVE_TO_END,
};
LogFile(LogFile &&other);
LogFile &operator=(LogFile &&other);
OutputFile() = default;
~OutputFile();
OutputFile(const OutputFile &) = delete;
OutputFile &operator=(const OutputFile &) = delete;
OutputFile(OutputFile &&other) noexcept;
OutputFile &operator=(OutputFile &&other) noexcept;
/// This method opens a new file used for writing. If the file doesn't exist
/// it is created and if the file exists data is appended to the file to
/// ensure that no data is ever lost. Files are created with a restrictive
/// permission mask (0640). On failure and misuse it crashes the program.
void Open(const std::filesystem::path &path);
/// it is created. The `mode` flags controls whether data is appended to the
/// file or the file is wiped on first write. Files are created with a
/// restrictive permission mask (0640). On failure and misuse it crashes the
/// program.
void Open(const std::filesystem::path &path, Mode mode);
/// Returns a boolean indicating whether a file is opened.
bool IsOpen() const;
@ -82,7 +162,17 @@ class LogFile {
/// the program.
void Write(const char *data, size_t size);
void Write(const uint8_t *data, size_t size);
void Write(const std::string &data);
void Write(const std::string_view &data);
/// This method gets the current absolute position in the file. On failure and
/// misuse it crashes the program.
size_t GetPosition();
/// This method sets the current position in the file and returns the absolute
/// set position in the file. The position is set to `offset` with the
/// starting point taken from `position`. On failure and misuse it crashes the
/// program.
size_t SetPosition(Position position, ssize_t offset);
/// Syncs currently pending data to the currently opened file. On failure
/// and misuse it crashes the program.
@ -90,7 +180,7 @@ class LogFile {
/// Closes the currently opened file. It doesn't perform a `Sync` on the
/// file. On failure and misuse it crashes the program.
void Close();
void Close() noexcept;
private:
int fd_{-1};

View File

@ -198,14 +198,17 @@ TEST_F(UtilsFileTest, EnsureDirOrDie) {
}
}
TEST_F(UtilsFileTest, LogFileExisting) {
TEST_F(UtilsFileTest, OutputFileExisting) {
for (const auto &dir : kDirsAll) {
for (const auto &file : kFilesAll) {
utils::LogFile handle;
utils::OutputFile handle;
if (utils::EndsWith(dir, "000") || utils::EndsWith(file, "000")) {
ASSERT_DEATH(handle.Open(storage / dir / file), "");
ASSERT_DEATH(handle.Open(storage / dir / file,
utils::OutputFile::Mode::APPEND_TO_EXISTING),
"");
} else {
handle.Open(storage / dir / file);
handle.Open(storage / dir / file,
utils::OutputFile::Mode::APPEND_TO_EXISTING);
ASSERT_TRUE(handle.IsOpen());
ASSERT_EQ(handle.path(), storage / dir / file);
handle.Write("hello world!\n", 13);
@ -216,14 +219,15 @@ TEST_F(UtilsFileTest, LogFileExisting) {
}
}
TEST_F(UtilsFileTest, LogFileNew) {
TEST_F(UtilsFileTest, OutputFileNew) {
for (const auto &dir : kDirsAll) {
utils::LogFile handle;
utils::OutputFile handle;
auto path = storage / dir / "test";
if (utils::EndsWith(dir, "000")) {
ASSERT_DEATH(handle.Open(path), "");
ASSERT_DEATH(
handle.Open(path, utils::OutputFile::Mode::APPEND_TO_EXISTING), "");
} else {
handle.Open(path);
handle.Open(path, utils::OutputFile::Mode::APPEND_TO_EXISTING);
ASSERT_TRUE(handle.IsOpen());
ASSERT_EQ(handle.path(), path);
handle.Write("hello world!\n");
@ -233,24 +237,27 @@ TEST_F(UtilsFileTest, LogFileNew) {
}
}
TEST_F(UtilsFileTest, LogFileInvalidUsage) {
utils::LogFile handle;
TEST_F(UtilsFileTest, OutputFileInvalidUsage) {
utils::OutputFile handle;
ASSERT_DEATH(handle.Write("hello!"), "");
ASSERT_DEATH(handle.Sync(), "");
ASSERT_DEATH(handle.Close(), "");
handle.Open(storage / "existing_dir_777" / "existing_file_777");
ASSERT_DEATH(handle.Open(storage / "existing_dir_770" / "existing_file_770"),
handle.Open(storage / "existing_dir_777" / "existing_file_777",
utils::OutputFile::Mode::APPEND_TO_EXISTING);
ASSERT_DEATH(handle.Open(storage / "existing_dir_770" / "existing_file_770",
utils::OutputFile::Mode::APPEND_TO_EXISTING),
"");
handle.Write("hello!");
handle.Sync();
handle.Close();
}
TEST_F(UtilsFileTest, LogFileMove) {
utils::LogFile original;
original.Open(storage / "existing_dir_777" / "existing_file_777");
TEST_F(UtilsFileTest, OutputFileMove) {
utils::OutputFile original;
original.Open(storage / "existing_dir_777" / "existing_file_777",
utils::OutputFile::Mode::APPEND_TO_EXISTING);
utils::LogFile moved(std::move(original));
utils::OutputFile moved(std::move(original));
ASSERT_DEATH(original.Write("hello!"), "");
ASSERT_DEATH(original.Sync(), "");
@ -264,13 +271,15 @@ TEST_F(UtilsFileTest, LogFileMove) {
moved.Sync();
moved.Close();
original.Open(storage / "existing_dir_770" / "existing_file_770");
original.Open(storage / "existing_dir_770" / "existing_file_770",
utils::OutputFile::Mode::APPEND_TO_EXISTING);
original.Close();
}
TEST_F(UtilsFileTest, LogFileDescriptorLeackage) {
TEST_F(UtilsFileTest, OutputFileDescriptorLeackage) {
for (int i = 0; i < 100000; ++i) {
utils::LogFile handle;
handle.Open(storage / "existing_dir_777" / "existing_file_777");
utils::OutputFile handle;
handle.Open(storage / "existing_dir_777" / "existing_file_777",
utils::OutputFile::Mode::APPEND_TO_EXISTING);
}
}