LOAD CSV implementation fixes ()

* Change how csv::Reader handles memory resources
* Add multiline quoted string test
This commit is contained in:
Josip Seljan 2021-03-24 11:02:55 +01:00 committed by jseljan
parent b3914c6b5d
commit 25eb2c147a
4 changed files with 80 additions and 40 deletions

View File

@ -3713,12 +3713,13 @@ TypedValue EvaluateOptionalExpression(Expression *expression, ExpressionEvaluato
auto ToOptionalString(ExpressionEvaluator *evaluator, Expression *expression) -> std::optional<utils::pmr::string> { auto ToOptionalString(ExpressionEvaluator *evaluator, Expression *expression) -> std::optional<utils::pmr::string> {
const auto evaluated_expr = EvaluateOptionalExpression(expression, evaluator); const auto evaluated_expr = EvaluateOptionalExpression(expression, evaluator);
if (evaluated_expr.IsString()) { if (evaluated_expr.IsString()) {
return utils::pmr::string(evaluated_expr.ValueString(), evaluator->GetMemoryResource()); return utils::pmr::string(evaluated_expr.ValueString(), utils::NewDeleteResource());
} }
return std::nullopt; return std::nullopt;
}; };
TypedValue CsvRowToTypedList(csv::Reader::Row row, utils::MemoryResource *mem) { TypedValue CsvRowToTypedList(csv::Reader::Row row) {
auto *mem = row.get_allocator().GetMemoryResource();
auto typed_columns = utils::pmr::vector<TypedValue>(mem); auto typed_columns = utils::pmr::vector<TypedValue>(mem);
typed_columns.reserve(row.size()); typed_columns.reserve(row.size());
for (auto &column : row) { for (auto &column : row) {
@ -3727,8 +3728,9 @@ TypedValue CsvRowToTypedList(csv::Reader::Row row, utils::MemoryResource *mem) {
return TypedValue(typed_columns, mem); return TypedValue(typed_columns, mem);
} }
TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header, utils::MemoryResource *mem) { TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header) {
// a valid row has the same number of elements as the header // a valid row has the same number of elements as the header
auto *mem = row.get_allocator().GetMemoryResource();
utils::pmr::map<utils::pmr::string, TypedValue> m(mem); utils::pmr::map<utils::pmr::string, TypedValue> m(mem);
for (auto i = 0; i < row.size(); ++i) { for (auto i = 0; i < row.size(); ++i) {
m.emplace(std::move(header[i]), std::move(row[i])); m.emplace(std::move(header[i]), std::move(row[i]));
@ -3773,12 +3775,12 @@ class LoadCsvCursor : public Cursor {
// pulling MATCH). // pulling MATCH).
if (!input_is_once_ && !input_pulled) return false; if (!input_is_once_ && !input_pulled) return false;
if (auto row = reader_->GetNextRow()) { if (auto row = reader_->GetNextRow(context.evaluation_context.memory)) {
if (!reader_->HasHeader()) { if (!reader_->HasHeader()) {
frame[self_->row_var_] = CsvRowToTypedList(std::move(*row), context.evaluation_context.memory); frame[self_->row_var_] = CsvRowToTypedList(std::move(*row));
} else { } else {
frame[self_->row_var_] = frame[self_->row_var_] = CsvRowToTypedMap(
CsvRowToTypedMap(std::move(*row), *reader_->GetHeader(), context.evaluation_context.memory); std::move(*row), csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory));
} }
return true; return true;
} }
@ -3800,12 +3802,15 @@ class LoadCsvCursor : public Cursor {
auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_); auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
auto maybe_quote = ToOptionalString(&evaluator, self_->quote_); auto maybe_quote = ToOptionalString(&evaluator, self_->quote_);
// no need to check if maybe_file is std::nullopt, as the parser makes sure // No need to check if maybe_file is std::nullopt, as the parser makes sure
// we can't get a nullptr for the 'file_' member in the LoadCsv clause // we can't get a nullptr for the 'file_' member in the LoadCsv clause.
// Note that the reader has to be given its own memory resource, as it
// persists between pulls, so it can't use the evalutation context memory
// resource.
return csv::Reader( return csv::Reader(
*maybe_file, *maybe_file,
csv::Reader::Config(self_->with_header_, self_->ignore_bad_, std::move(maybe_delim), std::move(maybe_quote)), csv::Reader::Config(self_->with_header_, self_->ignore_bad_, std::move(maybe_delim), std::move(maybe_quote)),
eval_context->memory); utils::NewDeleteResource());
} }
}; };

View File

@ -19,8 +19,8 @@ void Reader::InitializeStream() {
} }
} }
std::optional<utils::pmr::string> Reader::GetNextLine() { std::optional<utils::pmr::string> Reader::GetNextLine(utils::MemoryResource *mem) {
utils::pmr::string line(memory_); utils::pmr::string line(mem);
if (!std::getline(csv_stream_, line)) { if (!std::getline(csv_stream_, line)) {
// reached end of file or an I/0 error occurred // reached end of file or an I/0 error occurred
if (!csv_stream_.good()) { if (!csv_stream_.good()) {
@ -35,7 +35,7 @@ std::optional<utils::pmr::string> Reader::GetNextLine() {
Reader::ParsingResult Reader::ParseHeader() { Reader::ParsingResult Reader::ParseHeader() {
// header must be the very first line in the file // header must be the very first line in the file
MG_ASSERT(line_count_ == 1, fmt::format("Invalid use of {}", __func__)); MG_ASSERT(line_count_ == 1, fmt::format("Invalid use of {}", __func__));
return ParseRow(); return ParseRow(memory_);
} }
void Reader::TryInitializeHeader() { void Reader::TryInitializeHeader() {
@ -53,12 +53,12 @@ void Reader::TryInitializeHeader() {
} }
number_of_columns_ = header->size(); number_of_columns_ = header->size();
header_ = *header; header_ = std::move(*header);
} }
[[nodiscard]] bool Reader::HasHeader() const { return read_config_.with_header; } [[nodiscard]] bool Reader::HasHeader() const { return read_config_.with_header; }
const std::optional<Reader::Header> &Reader::GetHeader() const { return header_; } const Reader::Header &Reader::GetHeader() const { return header_; }
namespace { namespace {
enum class CsvParserState : uint8_t { enum class CsvParserState : uint8_t {
@ -70,8 +70,8 @@ enum class CsvParserState : uint8_t {
} // namespace } // namespace
Reader::ParsingResult Reader::ParseRow() { Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) {
utils::pmr::vector<utils::pmr::string> row(memory_); utils::pmr::vector<utils::pmr::string> row(mem);
if (number_of_columns_ != 0) { if (number_of_columns_ != 0) {
row.reserve(number_of_columns_); row.reserve(number_of_columns_);
} }
@ -81,7 +81,7 @@ Reader::ParsingResult Reader::ParseRow() {
auto state = CsvParserState::INITIAL_FIELD; auto state = CsvParserState::INITIAL_FIELD;
do { do {
const auto maybe_line = GetNextLine(); const auto maybe_line = GetNextLine(mem);
if (!maybe_line) { if (!maybe_line) {
// The whole file was processed. // The whole file was processed.
break; break;
@ -204,8 +204,8 @@ Reader::ParsingResult Reader::ParseRow() {
// making it unreadable; // making it unreadable;
// @throws CsvReadException if a bad row is encountered, and the ignore_bad is set // @throws CsvReadException if a bad row is encountered, and the ignore_bad is set
// to 'true' in the Reader::Config. // to 'true' in the Reader::Config.
std::optional<Reader::Row> Reader::GetNextRow() { std::optional<Reader::Row> Reader::GetNextRow(utils::MemoryResource *mem) {
auto row = ParseRow(); auto row = ParseRow(mem);
if (row.HasError()) { if (row.HasError()) {
if (!read_config_.ignore_bad) { if (!read_config_.ignore_bad) {
@ -217,7 +217,7 @@ std::optional<Reader::Row> Reader::GetNextRow() {
if (!csv_stream_.good()) { if (!csv_stream_.good()) {
return std::nullopt; return std::nullopt;
} }
row = ParseRow(); row = ParseRow(mem);
} while (row.HasError()); } while (row.HasError());
} }

View File

@ -44,7 +44,7 @@ class Reader {
Reader() = default; Reader() = default;
explicit Reader(std::filesystem::path path, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource()) explicit Reader(std::filesystem::path path, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource())
: path_(std::move(path)), memory_(mem) { : memory_(mem), path_(std::move(path)) {
read_config_.with_header = cfg.with_header; read_config_.with_header = cfg.with_header;
read_config_.ignore_bad = cfg.ignore_bad; read_config_.ignore_bad = cfg.ignore_bad;
read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_}; read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_};
@ -71,27 +71,27 @@ class Reader {
using ParsingResult = utils::BasicResult<ParseError, Row>; using ParsingResult = utils::BasicResult<ParseError, Row>;
[[nodiscard]] bool HasHeader() const; [[nodiscard]] bool HasHeader() const;
const std::optional<Header> &GetHeader() const; const Header &GetHeader() const;
std::optional<Row> GetNextRow(); std::optional<Row> GetNextRow(utils::MemoryResource *mem);
private: private:
utils::MemoryResource *memory_;
std::filesystem::path path_; std::filesystem::path path_;
std::ifstream csv_stream_; std::ifstream csv_stream_;
Config read_config_; Config read_config_;
uint64_t line_count_{1}; uint64_t line_count_{1};
uint16_t number_of_columns_{0}; uint16_t number_of_columns_{0};
std::optional<Header> header_{}; Header header_{memory_};
utils::MemoryResource *memory_;
void InitializeStream(); void InitializeStream();
void TryInitializeHeader(); void TryInitializeHeader();
std::optional<utils::pmr::string> GetNextLine(); std::optional<utils::pmr::string> GetNextLine(utils::MemoryResource *mem);
ParsingResult ParseHeader(); ParsingResult ParseHeader();
ParsingResult ParseRow(); ParsingResult ParseRow(utils::MemoryResource *mem);
}; };
} // namespace csv } // namespace csv

View File

@ -89,8 +89,8 @@ TEST_F(CsvReaderTest, CommaDelimiter) {
csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = csv::Reader(filepath, cfg, mem); auto reader = csv::Reader(filepath, cfg, mem);
auto parsed_row = reader.GetNextRow(); auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(parsed_row, ToPmrColumns(columns)); ASSERT_EQ(*parsed_row, ToPmrColumns(columns));
} }
TEST_F(CsvReaderTest, SemicolonDelimiter) { TEST_F(CsvReaderTest, SemicolonDelimiter) {
@ -112,8 +112,8 @@ TEST_F(CsvReaderTest, SemicolonDelimiter) {
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = csv::Reader(filepath, cfg, mem); auto reader = csv::Reader(filepath, cfg, mem);
auto parsed_row = reader.GetNextRow(); auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(parsed_row, ToPmrColumns(columns)); ASSERT_EQ(*parsed_row, ToPmrColumns(columns));
} }
TEST_F(CsvReaderTest, SkipBad) { TEST_F(CsvReaderTest, SkipBad) {
@ -145,8 +145,8 @@ TEST_F(CsvReaderTest, SkipBad) {
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = csv::Reader(filepath, cfg, mem); auto reader = csv::Reader(filepath, cfg, mem);
auto parsed_row = reader.GetNextRow(); auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(parsed_row, ToPmrColumns(columns_good)); ASSERT_EQ(*parsed_row, ToPmrColumns(columns_good));
} }
{ {
@ -157,7 +157,7 @@ TEST_F(CsvReaderTest, SkipBad) {
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = csv::Reader(filepath, cfg, mem); auto reader = csv::Reader(filepath, cfg, mem);
EXPECT_THROW(reader.GetNextRow(), csv::CsvReadException); EXPECT_THROW(reader.GetNextRow(mem), csv::CsvReadException);
} }
} }
@ -185,8 +185,8 @@ TEST_F(CsvReaderTest, AllRowsValid) {
auto reader = csv::Reader(filepath, cfg); auto reader = csv::Reader(filepath, cfg);
const auto pmr_columns = ToPmrColumns(columns); const auto pmr_columns = ToPmrColumns(columns);
while (auto parsed_row = reader.GetNextRow()) { while (auto parsed_row = reader.GetNextRow(mem)) {
ASSERT_EQ(parsed_row, pmr_columns); ASSERT_EQ(*parsed_row, pmr_columns);
} }
} }
@ -213,7 +213,7 @@ TEST_F(CsvReaderTest, SkipAllRows) {
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = csv::Reader(filepath, cfg); auto reader = csv::Reader(filepath, cfg);
auto parsed_row = reader.GetNextRow(); auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(parsed_row, std::nullopt); ASSERT_EQ(parsed_row, std::nullopt);
} }
@ -244,7 +244,42 @@ TEST_F(CsvReaderTest, WithHeader) {
ASSERT_EQ(reader.GetHeader(), pmr_header); ASSERT_EQ(reader.GetHeader(), pmr_header);
const auto pmr_columns = ToPmrColumns(columns); const auto pmr_columns = ToPmrColumns(columns);
while (auto parsed_row = reader.GetNextRow()) { while (auto parsed_row = reader.GetNextRow(mem)) {
ASSERT_EQ(parsed_row, pmr_columns); ASSERT_EQ(*parsed_row, pmr_columns);
} }
} }
TEST_F(CsvReaderTest, MultilineQuotedString) {
// create a file with first row valid and the second row containing a quoted
// string spanning two lines;
// parser should return two valid rows
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath);
utils::MemoryResource *mem(utils::NewDeleteResource());
const utils::pmr::string delimiter{",", mem};
const utils::pmr::string quote{"\"", mem};
const std::vector<std::string> first_row{"A", "B", "C"};
const std::vector<std::string> multiline_first{"D", "\"E", "\"\"F"};
const std::vector<std::string> multiline_second{"G\"", "H"};
writer.WriteLine(CreateRow(first_row, delimiter));
writer.WriteLine(CreateRow(multiline_first, delimiter));
writer.WriteLine(CreateRow(multiline_second, delimiter));
writer.Close();
const bool with_header = false;
const bool ignore_bad = true;
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = csv::Reader(filepath, cfg);
auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(*parsed_row, ToPmrColumns(first_row));
const std::vector<std::string> expected_multiline{"D", "E,\"FG", "H"};
parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(*parsed_row, ToPmrColumns(expected_multiline));
}