Implement buffering for utils file wrappers

Summary:
This change increases the throughput of the storage v2 durability 20x. With
this change, the storage v2 durability is 3x faster than the storage v1
durability in both recovery and snapshotting (before the change v2 durability
is slower than v1 durability).

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2529
This commit is contained in:
Matej Ferencevic 2019-11-04 16:58:18 +01:00
parent 47d6196b32
commit a835939a6e
2 changed files with 174 additions and 62 deletions

View File

@ -5,6 +5,7 @@
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
#include <fstream>
#include <type_traits>
@ -73,8 +74,20 @@ static_assert(std::is_same_v<off_t, ssize_t>, "off_t must fit into ssize_t!");
InputFile::~InputFile() { Close(); }
InputFile::InputFile(InputFile &&other) noexcept
: fd_(other.fd_), path_(std::move(other.path_)) {
: fd_(other.fd_),
path_(std::move(other.path_)),
file_size_(other.file_size_),
file_position_(other.file_position_),
buffer_start_(other.buffer_start_),
buffer_size_(other.buffer_size_),
buffer_position_(other.buffer_position_) {
memcpy(buffer_, other.buffer_, kFileBufferSize);
other.fd_ = -1;
other.file_size_ = 0;
other.file_position_ = 0;
other.buffer_start_ = std::nullopt;
other.buffer_size_ = 0;
other.buffer_position_ = 0;
}
InputFile &InputFile::operator=(InputFile &&other) noexcept {
@ -82,8 +95,19 @@ InputFile &InputFile::operator=(InputFile &&other) noexcept {
fd_ = other.fd_;
path_ = std::move(other.path_);
file_size_ = other.file_size_;
file_position_ = other.file_position_;
buffer_start_ = other.buffer_start_;
buffer_size_ = other.buffer_size_;
buffer_position_ = other.buffer_position_;
memcpy(buffer_, other.buffer_, kFileBufferSize);
other.fd_ = -1;
other.file_size_ = 0;
other.file_position_ = 0;
other.buffer_start_ = std::nullopt;
other.buffer_size_ = 0;
other.buffer_position_ = 0;
return *this;
}
@ -105,7 +129,17 @@ bool InputFile::Open(const std::filesystem::path &path) {
}
}
return fd_ != -1;
if (fd_ == -1) return false;
// Get file size.
auto size = SetPosition(Position::RELATIVE_TO_END, 0);
if (!size || !SetPosition(Position::SET, 0)) {
Close();
return false;
}
file_size_ = *size;
return true;
}
bool InputFile::IsOpen() const { return fd_ != -1; }
@ -116,55 +150,45 @@ bool InputFile::Read(uint8_t *data, size_t size) {
size_t offset = 0;
while (size > 0) {
auto got = read(fd_, data + offset, size);
if (got == -1 && errno == EINTR) {
auto buffer_left = buffer_size_ - buffer_position_;
if (!buffer_start_ || buffer_left == 0) {
if (!LoadBuffer()) return false;
continue;
}
if (got <= 0) {
return false;
}
size -= got;
offset += got;
auto to_copy = size < buffer_left ? size : buffer_left;
memcpy(data + offset, buffer_ + buffer_position_, to_copy);
size -= to_copy;
offset += to_copy;
buffer_position_ += to_copy;
}
return true;
}
bool InputFile::Peek(uint8_t *data, size_t size) {
size_t offset = 0;
auto old_buffer_start = buffer_start_;
auto old_buffer_position = buffer_position_;
auto real_position = GetPosition();
while (size > 0) {
auto got = read(fd_, data + offset, size);
if (got == -1 && errno == EINTR) {
continue;
}
auto ret = Read(data, size);
if (got <= 0) {
SetPosition(Position::RELATIVE_TO_CURRENT, -offset);
return false;
}
size -= got;
offset += got;
if (buffer_start_ == old_buffer_start) {
// If we are still within the same buffer (eg. the `size` was small enough),
// we don't reset the buffer and just set the buffer position to the old
// buffer position.
buffer_position_ = old_buffer_position;
} else {
SetPosition(Position::SET, real_position);
}
SetPosition(Position::RELATIVE_TO_CURRENT, -offset);
return true;
return ret;
}
std::optional<size_t> InputFile::GetSize() {
auto current = GetPosition();
if (!current) return std::nullopt;
auto size = SetPosition(Position::RELATIVE_TO_END, 0);
if (!size) return std::nullopt;
if (!SetPosition(Position::SET, *current)) return std::nullopt;
return size;
}
size_t InputFile::GetSize() { return file_size_; }
std::optional<size_t> InputFile::GetPosition() {
return SetPosition(Position::RELATIVE_TO_CURRENT, 0);
size_t InputFile::GetPosition() {
if (buffer_start_) return *buffer_start_ + buffer_position_;
return file_position_;
}
std::optional<size_t> InputFile::SetPosition(Position position,
@ -187,6 +211,10 @@ std::optional<size_t> InputFile::SetPosition(Position position,
continue;
}
if (pos < 0) return std::nullopt;
file_position_ = pos;
buffer_start_ = std::nullopt;
buffer_size_ = 0;
buffer_position_ = 0;
return pos;
}
}
@ -216,6 +244,39 @@ void InputFile::Close() noexcept {
fd_ = -1;
}
bool InputFile::LoadBuffer() {
buffer_start_ = std::nullopt;
buffer_size_ = 0;
buffer_position_ = 0;
size_t size = kFileBufferSize;
if (file_position_ + size >= file_size_) {
size = file_size_ - file_position_;
}
if (size == 0) return false;
buffer_size_ = size;
size_t offset = 0;
while (size > 0) {
auto got = read(fd_, buffer_ + offset, size);
if (got == -1 && errno == EINTR) {
continue;
}
if (got <= 0) {
return false;
}
size -= got;
offset += got;
}
buffer_start_ = file_position_;
file_position_ += buffer_size_;
return true;
}
OutputFile::~OutputFile() {
if (IsOpen()) Close();
}
@ -223,9 +284,12 @@ OutputFile::~OutputFile() {
OutputFile::OutputFile(OutputFile &&other) noexcept
: fd_(other.fd_),
written_since_last_sync_(other.written_since_last_sync_),
path_(std::move(other.path_)) {
path_(std::move(other.path_)),
buffer_position_(other.buffer_position_) {
memcpy(buffer_, other.buffer_, kFileBufferSize);
other.fd_ = -1;
other.written_since_last_sync_ = 0;
other.buffer_position_ = 0;
}
OutputFile &OutputFile::operator=(OutputFile &&other) noexcept {
@ -234,9 +298,12 @@ OutputFile &OutputFile::operator=(OutputFile &&other) noexcept {
fd_ = other.fd_;
written_since_last_sync_ = other.written_since_last_sync_;
path_ = std::move(other.path_);
buffer_position_ = other.buffer_position_;
memcpy(buffer_, other.buffer_, kFileBufferSize);
other.fd_ = -1;
other.written_since_last_sync_ = 0;
other.buffer_position_ = 0;
return *this;
}
@ -275,28 +342,21 @@ bool OutputFile::IsOpen() const { return fd_ != -1; }
const std::filesystem::path &OutputFile::path() const { return path_; }
void OutputFile::Write(const char *data, size_t size) {
void OutputFile::Write(const uint8_t *data, size_t size) {
while (size > 0) {
auto written = write(fd_, data, size);
if (written == -1 && errno == EINTR) {
continue;
}
CHECK(written > 0)
<< "While trying to write to " << path_
<< " an error occurred: " << strerror(errno) << " (" << errno
<< "). Possibly " << size
<< " bytes of data were lost from this call and possibly "
<< written_since_last_sync_ << " bytes were lost from previous calls.";
size -= written;
data += written;
written_since_last_sync_ += written;
FlushBuffer(false);
auto buffer_left = kFileBufferSize - buffer_position_;
auto to_write = size < buffer_left ? size : buffer_left;
memcpy(buffer_ + buffer_position_, data, to_write);
size -= to_write;
data += to_write;
buffer_position_ += to_write;
written_since_last_sync_ += to_write;
}
}
void OutputFile::Write(const uint8_t *data, size_t size) {
Write(reinterpret_cast<const char *>(data), size);
void OutputFile::Write(const char *data, size_t size) {
Write(reinterpret_cast<const uint8_t *>(data), size);
}
void OutputFile::Write(const std::string_view &data) {
Write(data.data(), data.size());
@ -307,6 +367,8 @@ size_t OutputFile::GetPosition() {
}
size_t OutputFile::SetPosition(Position position, ssize_t offset) {
FlushBuffer(true);
int whence;
switch (position) {
case Position::SET:
@ -332,6 +394,8 @@ size_t OutputFile::SetPosition(Position position, ssize_t offset) {
}
void OutputFile::Sync() {
FlushBuffer(true);
int ret = 0;
while (true) {
ret = fsync(fd_);
@ -378,6 +442,8 @@ void OutputFile::Sync() {
}
void OutputFile::Close() noexcept {
FlushBuffer(true);
int ret = 0;
while (true) {
ret = close(fd_);
@ -400,4 +466,32 @@ void OutputFile::Close() noexcept {
written_since_last_sync_ = 0;
}
void OutputFile::FlushBuffer(bool force_flush) {
CHECK(IsOpen());
if (!force_flush && buffer_position_ < kFileBufferSize) return;
CHECK(buffer_position_ <= kFileBufferSize)
<< "While trying to write to " << path_
<< " more file was written to the buffer than the buffer has space!";
auto *buffer = buffer_;
while (buffer_position_ > 0) {
auto written = write(fd_, buffer, buffer_position_);
if (written == -1 && errno == EINTR) {
continue;
}
CHECK(written > 0)
<< "While trying to write to " << path_
<< " an error occurred: " << strerror(errno) << " (" << errno
<< "). Possibly " << buffer_position_
<< " bytes of data were lost from this call and possibly "
<< written_since_last_sync_ << " bytes were lost from previous calls.";
buffer_position_ -= written;
buffer += written;
}
}
} // namespace utils

View File

@ -45,6 +45,13 @@ bool CopyFile(const std::filesystem::path &src,
bool RenamePath(const std::filesystem::path &src,
const std::filesystem::path &dst);
/// Buffer size used for `InputFile` and `OutputFile` implementations. Using
/// system calls is very expensive and we can't afford to call either `read` or
/// `write` for each of our (very small) logical reads/writes. Because of that,
/// `read` or `write` is only called when the buffer is full and/or needs
/// emptying.
const size_t kFileBufferSize = 262144;
/// This class implements a file handler that is used to read binary files. It
/// was developed because the C++ standard library has an awful API and makes
/// handling of binary data extremely tedious.
@ -89,13 +96,11 @@ class InputFile {
/// doesn't change the current position in the file.
bool Peek(uint8_t *data, size_t size);
/// This method gets the size of the file. On failure it returns
/// `std::nullopt`.
std::optional<size_t> GetSize();
/// This method gets the size of the file.
size_t GetSize();
/// This method gets the current absolute position in the file. On failure it
/// returns `std::nullopt`.
std::optional<size_t> GetPosition();
/// This method gets the current absolute position in the file.
size_t GetPosition();
/// This method sets the current position in the file and returns the absolute
/// set position in the file. The position is set to `offset` with the
@ -107,8 +112,17 @@ class InputFile {
void Close() noexcept;
private:
bool LoadBuffer();
int fd_{-1};
std::filesystem::path path_;
size_t file_size_{0};
size_t file_position_{0};
uint8_t buffer_[kFileBufferSize];
std::optional<size_t> buffer_start_;
size_t buffer_size_{0};
size_t buffer_position_{0};
};
/// This class implements a file handler that is used for mission critical files
@ -171,8 +185,8 @@ class OutputFile {
/// Writes data to the currently opened file. On failure and misuse it crashes
/// the program.
void Write(const char *data, size_t size);
void Write(const uint8_t *data, size_t size);
void Write(const char *data, size_t size);
void Write(const std::string_view &data);
/// This method gets the current absolute position in the file. On failure and
@ -194,9 +208,13 @@ class OutputFile {
void Close() noexcept;
private:
void FlushBuffer(bool force_flush);
int fd_{-1};
size_t written_since_last_sync_{0};
std::filesystem::path path_;
uint8_t buffer_[kFileBufferSize];
size_t buffer_position_{0};
};
} // namespace utils