From 25eb2c147a4cd728184a56e920e64cdd0cbf47dd Mon Sep 17 00:00:00 2001 From: Josip Seljan <62958579+the-joksim@users.noreply.github.com> Date: Wed, 24 Mar 2021 11:02:55 +0100 Subject: [PATCH] LOAD CSV implementation fixes (#120) * Change how csv::Reader handles memory resources * Add multiline quoted string test --- src/query/plan/operator.cpp | 25 ++++++++------ src/utils/csv_parsing.cpp | 22 ++++++------ src/utils/csv_parsing.hpp | 14 ++++---- tests/unit/utils_csv_parsing.cpp | 59 +++++++++++++++++++++++++------- 4 files changed, 80 insertions(+), 40 deletions(-) diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 2ec02787f..4de436c88 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -3713,12 +3713,13 @@ TypedValue EvaluateOptionalExpression(Expression *expression, ExpressionEvaluato auto ToOptionalString(ExpressionEvaluator *evaluator, Expression *expression) -> std::optional { const auto evaluated_expr = EvaluateOptionalExpression(expression, evaluator); if (evaluated_expr.IsString()) { - return utils::pmr::string(evaluated_expr.ValueString(), evaluator->GetMemoryResource()); + return utils::pmr::string(evaluated_expr.ValueString(), utils::NewDeleteResource()); } return std::nullopt; }; -TypedValue CsvRowToTypedList(csv::Reader::Row row, utils::MemoryResource *mem) { +TypedValue CsvRowToTypedList(csv::Reader::Row row) { + auto *mem = row.get_allocator().GetMemoryResource(); auto typed_columns = utils::pmr::vector(mem); typed_columns.reserve(row.size()); for (auto &column : row) { @@ -3727,8 +3728,9 @@ TypedValue CsvRowToTypedList(csv::Reader::Row row, utils::MemoryResource *mem) { return TypedValue(typed_columns, mem); } -TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header, utils::MemoryResource *mem) { +TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header) { // a valid row has the same number of elements as the header + auto *mem = row.get_allocator().GetMemoryResource(); utils::pmr::map m(mem); for (auto i = 0; i < row.size(); ++i) { m.emplace(std::move(header[i]), std::move(row[i])); @@ -3773,12 +3775,12 @@ class LoadCsvCursor : public Cursor { // pulling MATCH). if (!input_is_once_ && !input_pulled) return false; - if (auto row = reader_->GetNextRow()) { + if (auto row = reader_->GetNextRow(context.evaluation_context.memory)) { if (!reader_->HasHeader()) { - frame[self_->row_var_] = CsvRowToTypedList(std::move(*row), context.evaluation_context.memory); + frame[self_->row_var_] = CsvRowToTypedList(std::move(*row)); } else { - frame[self_->row_var_] = - CsvRowToTypedMap(std::move(*row), *reader_->GetHeader(), context.evaluation_context.memory); + frame[self_->row_var_] = CsvRowToTypedMap( + std::move(*row), csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory)); } return true; } @@ -3800,12 +3802,15 @@ class LoadCsvCursor : public Cursor { auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_); auto maybe_quote = ToOptionalString(&evaluator, self_->quote_); - // no need to check if maybe_file is std::nullopt, as the parser makes sure - // we can't get a nullptr for the 'file_' member in the LoadCsv clause + // No need to check if maybe_file is std::nullopt, as the parser makes sure + // we can't get a nullptr for the 'file_' member in the LoadCsv clause. + // Note that the reader has to be given its own memory resource, as it + // persists between pulls, so it can't use the evalutation context memory + // resource. return csv::Reader( *maybe_file, csv::Reader::Config(self_->with_header_, self_->ignore_bad_, std::move(maybe_delim), std::move(maybe_quote)), - eval_context->memory); + utils::NewDeleteResource()); } }; diff --git a/src/utils/csv_parsing.cpp b/src/utils/csv_parsing.cpp index 61d539b47..d3843b795 100644 --- a/src/utils/csv_parsing.cpp +++ b/src/utils/csv_parsing.cpp @@ -19,8 +19,8 @@ void Reader::InitializeStream() { } } -std::optional Reader::GetNextLine() { - utils::pmr::string line(memory_); +std::optional Reader::GetNextLine(utils::MemoryResource *mem) { + utils::pmr::string line(mem); if (!std::getline(csv_stream_, line)) { // reached end of file or an I/0 error occurred if (!csv_stream_.good()) { @@ -35,7 +35,7 @@ std::optional Reader::GetNextLine() { Reader::ParsingResult Reader::ParseHeader() { // header must be the very first line in the file MG_ASSERT(line_count_ == 1, fmt::format("Invalid use of {}", __func__)); - return ParseRow(); + return ParseRow(memory_); } void Reader::TryInitializeHeader() { @@ -53,12 +53,12 @@ void Reader::TryInitializeHeader() { } number_of_columns_ = header->size(); - header_ = *header; + header_ = std::move(*header); } [[nodiscard]] bool Reader::HasHeader() const { return read_config_.with_header; } -const std::optional &Reader::GetHeader() const { return header_; } +const Reader::Header &Reader::GetHeader() const { return header_; } namespace { enum class CsvParserState : uint8_t { @@ -70,8 +70,8 @@ enum class CsvParserState : uint8_t { } // namespace -Reader::ParsingResult Reader::ParseRow() { - utils::pmr::vector row(memory_); +Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) { + utils::pmr::vector row(mem); if (number_of_columns_ != 0) { row.reserve(number_of_columns_); } @@ -81,7 +81,7 @@ Reader::ParsingResult Reader::ParseRow() { auto state = CsvParserState::INITIAL_FIELD; do { - const auto maybe_line = GetNextLine(); + const auto maybe_line = GetNextLine(mem); if (!maybe_line) { // The whole file was processed. break; @@ -204,8 +204,8 @@ Reader::ParsingResult Reader::ParseRow() { // making it unreadable; // @throws CsvReadException if a bad row is encountered, and the ignore_bad is set // to 'true' in the Reader::Config. -std::optional Reader::GetNextRow() { - auto row = ParseRow(); +std::optional Reader::GetNextRow(utils::MemoryResource *mem) { + auto row = ParseRow(mem); if (row.HasError()) { if (!read_config_.ignore_bad) { @@ -217,7 +217,7 @@ std::optional Reader::GetNextRow() { if (!csv_stream_.good()) { return std::nullopt; } - row = ParseRow(); + row = ParseRow(mem); } while (row.HasError()); } diff --git a/src/utils/csv_parsing.hpp b/src/utils/csv_parsing.hpp index 8727ab3c6..efe4bf469 100644 --- a/src/utils/csv_parsing.hpp +++ b/src/utils/csv_parsing.hpp @@ -44,7 +44,7 @@ class Reader { Reader() = default; explicit Reader(std::filesystem::path path, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource()) - : path_(std::move(path)), memory_(mem) { + : memory_(mem), path_(std::move(path)) { read_config_.with_header = cfg.with_header; read_config_.ignore_bad = cfg.ignore_bad; read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_}; @@ -71,27 +71,27 @@ class Reader { using ParsingResult = utils::BasicResult; [[nodiscard]] bool HasHeader() const; - const std::optional
&GetHeader() const; - std::optional GetNextRow(); + const Header &GetHeader() const; + std::optional GetNextRow(utils::MemoryResource *mem); private: + utils::MemoryResource *memory_; std::filesystem::path path_; std::ifstream csv_stream_; Config read_config_; uint64_t line_count_{1}; uint16_t number_of_columns_{0}; - std::optional
header_{}; - utils::MemoryResource *memory_; + Header header_{memory_}; void InitializeStream(); void TryInitializeHeader(); - std::optional GetNextLine(); + std::optional GetNextLine(utils::MemoryResource *mem); ParsingResult ParseHeader(); - ParsingResult ParseRow(); + ParsingResult ParseRow(utils::MemoryResource *mem); }; } // namespace csv diff --git a/tests/unit/utils_csv_parsing.cpp b/tests/unit/utils_csv_parsing.cpp index f405dfe24..0305e3b05 100644 --- a/tests/unit/utils_csv_parsing.cpp +++ b/tests/unit/utils_csv_parsing.cpp @@ -89,8 +89,8 @@ TEST_F(CsvReaderTest, CommaDelimiter) { csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; auto reader = csv::Reader(filepath, cfg, mem); - auto parsed_row = reader.GetNextRow(); - ASSERT_EQ(parsed_row, ToPmrColumns(columns)); + auto parsed_row = reader.GetNextRow(mem); + ASSERT_EQ(*parsed_row, ToPmrColumns(columns)); } TEST_F(CsvReaderTest, SemicolonDelimiter) { @@ -112,8 +112,8 @@ TEST_F(CsvReaderTest, SemicolonDelimiter) { const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; auto reader = csv::Reader(filepath, cfg, mem); - auto parsed_row = reader.GetNextRow(); - ASSERT_EQ(parsed_row, ToPmrColumns(columns)); + auto parsed_row = reader.GetNextRow(mem); + ASSERT_EQ(*parsed_row, ToPmrColumns(columns)); } TEST_F(CsvReaderTest, SkipBad) { @@ -145,8 +145,8 @@ TEST_F(CsvReaderTest, SkipBad) { const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; auto reader = csv::Reader(filepath, cfg, mem); - auto parsed_row = reader.GetNextRow(); - ASSERT_EQ(parsed_row, ToPmrColumns(columns_good)); + auto parsed_row = reader.GetNextRow(mem); + ASSERT_EQ(*parsed_row, ToPmrColumns(columns_good)); } { @@ -157,7 +157,7 @@ TEST_F(CsvReaderTest, SkipBad) { const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; auto reader = csv::Reader(filepath, cfg, mem); - EXPECT_THROW(reader.GetNextRow(), csv::CsvReadException); + EXPECT_THROW(reader.GetNextRow(mem), csv::CsvReadException); } } @@ -185,8 +185,8 @@ TEST_F(CsvReaderTest, AllRowsValid) { auto reader = csv::Reader(filepath, cfg); const auto pmr_columns = ToPmrColumns(columns); - while (auto parsed_row = reader.GetNextRow()) { - ASSERT_EQ(parsed_row, pmr_columns); + while (auto parsed_row = reader.GetNextRow(mem)) { + ASSERT_EQ(*parsed_row, pmr_columns); } } @@ -213,7 +213,7 @@ TEST_F(CsvReaderTest, SkipAllRows) { const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; auto reader = csv::Reader(filepath, cfg); - auto parsed_row = reader.GetNextRow(); + auto parsed_row = reader.GetNextRow(mem); ASSERT_EQ(parsed_row, std::nullopt); } @@ -244,7 +244,42 @@ TEST_F(CsvReaderTest, WithHeader) { ASSERT_EQ(reader.GetHeader(), pmr_header); const auto pmr_columns = ToPmrColumns(columns); - while (auto parsed_row = reader.GetNextRow()) { - ASSERT_EQ(parsed_row, pmr_columns); + while (auto parsed_row = reader.GetNextRow(mem)) { + ASSERT_EQ(*parsed_row, pmr_columns); } } + +TEST_F(CsvReaderTest, MultilineQuotedString) { + // create a file with first row valid and the second row containing a quoted + // string spanning two lines; + // parser should return two valid rows + const auto filepath = csv_directory / "bla.csv"; + auto writer = FileWriter(filepath); + + utils::MemoryResource *mem(utils::NewDeleteResource()); + + const utils::pmr::string delimiter{",", mem}; + const utils::pmr::string quote{"\"", mem}; + + const std::vector first_row{"A", "B", "C"}; + const std::vector multiline_first{"D", "\"E", "\"\"F"}; + const std::vector multiline_second{"G\"", "H"}; + + writer.WriteLine(CreateRow(first_row, delimiter)); + writer.WriteLine(CreateRow(multiline_first, delimiter)); + writer.WriteLine(CreateRow(multiline_second, delimiter)); + + writer.Close(); + + const bool with_header = false; + const bool ignore_bad = true; + const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = csv::Reader(filepath, cfg); + + auto parsed_row = reader.GetNextRow(mem); + ASSERT_EQ(*parsed_row, ToPmrColumns(first_row)); + + const std::vector expected_multiline{"D", "E,\"FG", "H"}; + parsed_row = reader.GetNextRow(mem); + ASSERT_EQ(*parsed_row, ToPmrColumns(expected_multiline)); +}