Better CSVReader memory usage

This commit is contained in:
Gareth Lloyd 2024-03-04 17:03:25 +00:00
parent de2e2048ef
commit be66f03cc8
3 changed files with 29 additions and 13 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd. // Copyright 2024 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -119,6 +119,8 @@ class Reader {
auto GetHeader() const -> Header const &; auto GetHeader() const -> Header const &;
auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Row>; auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Row>;
void Reset();
private: private:
// Some implementation issues that need clearing up, but this is mainly because // Some implementation issues that need clearing up, but this is mainly because
// I don't want `boost/iostreams/filtering_stream.hpp` included in this header file // I don't want `boost/iostreams/filtering_stream.hpp` included in this header file

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd. // Copyright 2024 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -34,6 +34,10 @@ struct Reader::impl {
[[nodiscard]] bool HasHeader() const { return read_config_.with_header; } [[nodiscard]] bool HasHeader() const { return read_config_.with_header; }
[[nodiscard]] auto Header() const -> Header const & { return header_; } [[nodiscard]] auto Header() const -> Header const & { return header_; }
void Reset() {
line_buffer_.clear();
line_buffer_.shrink_to_fit();
}
auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Reader::Row>; auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Reader::Row>;
@ -42,7 +46,7 @@ struct Reader::impl {
void TryInitializeHeader(); void TryInitializeHeader();
std::optional<utils::pmr::string> GetNextLine(utils::MemoryResource *mem); bool GetNextLine();
ParsingResult ParseHeader(); ParsingResult ParseHeader();
@ -55,6 +59,8 @@ struct Reader::impl {
Config read_config_; Config read_config_;
uint64_t line_count_{1}; uint64_t line_count_{1};
uint16_t number_of_columns_{0}; uint16_t number_of_columns_{0};
uint64_t estimated_number_of_columns_{0};
utils::pmr::string line_buffer_{memory_};
Reader::Header header_{memory_}; Reader::Header header_{memory_};
}; };
@ -129,17 +135,16 @@ void Reader::impl::InitializeStream() {
MG_ASSERT(csv_stream_.is_complete(), "Should be 'complete' for correct operation"); MG_ASSERT(csv_stream_.is_complete(), "Should be 'complete' for correct operation");
} }
std::optional<utils::pmr::string> Reader::impl::GetNextLine(utils::MemoryResource *mem) { bool Reader::impl::GetNextLine() {
utils::pmr::string line(mem); if (!std::getline(csv_stream_, line_buffer_)) {
if (!std::getline(csv_stream_, line)) {
// reached end of file or an I/0 error occurred // reached end of file or an I/0 error occurred
if (!csv_stream_.good()) { if (!csv_stream_.good()) {
csv_stream_.reset(); // this will close the file_stream_ and clear the chain csv_stream_.reset(); // this will close the file_stream_ and clear the chain
} }
return std::nullopt; return false;
} }
++line_count_; ++line_count_;
return std::move(line); return true;
} }
Reader::ParsingResult Reader::impl::ParseHeader() { Reader::ParsingResult Reader::impl::ParseHeader() {
@ -170,6 +175,8 @@ void Reader::impl::TryInitializeHeader() {
const Reader::Header &Reader::GetHeader() const { return pimpl->Header(); } const Reader::Header &Reader::GetHeader() const { return pimpl->Header(); }
void Reader::Reset() { pimpl->Reset(); }
namespace { namespace {
enum class CsvParserState : uint8_t { INITIAL_FIELD, NEXT_FIELD, QUOTING, EXPECT_DELIMITER, DONE }; enum class CsvParserState : uint8_t { INITIAL_FIELD, NEXT_FIELD, QUOTING, EXPECT_DELIMITER, DONE };
@ -179,6 +186,8 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
utils::pmr::vector<utils::pmr::string> row(mem); utils::pmr::vector<utils::pmr::string> row(mem);
if (number_of_columns_ != 0) { if (number_of_columns_ != 0) {
row.reserve(number_of_columns_); row.reserve(number_of_columns_);
} else if (estimated_number_of_columns_ != 0) {
row.reserve(estimated_number_of_columns_);
} }
utils::pmr::string column(memory_); utils::pmr::string column(memory_);
@ -186,13 +195,12 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
auto state = CsvParserState::INITIAL_FIELD; auto state = CsvParserState::INITIAL_FIELD;
do { do {
const auto maybe_line = GetNextLine(mem); if (!GetNextLine()) {
if (!maybe_line) {
// The whole file was processed. // The whole file was processed.
break; break;
} }
std::string_view line_string_view = *maybe_line; std::string_view line_string_view = line_buffer_;
// remove '\r' from the end in case we have dos file format // remove '\r' from the end in case we have dos file format
if (line_string_view.back() == '\r') { if (line_string_view.back() == '\r') {
@ -312,6 +320,11 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
fmt::format("Expected {:d} columns in row {:d}, but got {:d}", number_of_columns_, fmt::format("Expected {:d} columns in row {:d}, but got {:d}", number_of_columns_,
line_count_ - 1, row.size())); line_count_ - 1, row.size()));
} }
// To avoid unessisary dynamic growth of the row, remember the number of
// columns for future calls
if (number_of_columns_ == 0 && estimated_number_of_columns_ == 0) {
estimated_number_of_columns_ = row.size();
}
return std::move(row); return std::move(row);
} }
@ -319,7 +332,7 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
std::optional<Reader::Row> Reader::impl::GetNextRow(utils::MemoryResource *mem) { std::optional<Reader::Row> Reader::impl::GetNextRow(utils::MemoryResource *mem) {
auto row = ParseRow(mem); auto row = ParseRow(mem);
if (row.HasError()) { if (row.HasError()) [[unlikely]] {
if (!read_config_.ignore_bad) { if (!read_config_.ignore_bad) {
throw CsvReadException("CSV Reader: Bad row at line {:d}: {}", line_count_ - 1, row.GetError().message); throw CsvReadException("CSV Reader: Bad row at line {:d}: {}", line_count_ - 1, row.GetError().message);
} }
@ -333,7 +346,7 @@ std::optional<Reader::Row> Reader::impl::GetNextRow(utils::MemoryResource *mem)
} while (row.HasError()); } while (row.HasError());
} }
if (row->empty()) { if (row->empty()) [[unlikely]] {
// reached end of file // reached end of file
return std::nullopt; return std::nullopt;
} }

View File

@ -5336,6 +5336,7 @@ class LoadCsvCursor : public Cursor {
"1"); "1");
} }
did_pull_ = true; did_pull_ = true;
reader_->Reset();
} }
auto row = reader_->GetNextRow(context.evaluation_context.memory); auto row = reader_->GetNextRow(context.evaluation_context.memory);