Add LOAD CSV clause infrastructure (#101)
* Add LOAD CSV clause infrastructure * Add LoadCsv operator * Update csv::Reader class * Support csv files with and without header Co-authored-by: jseljan <josip.seljan@memgraph.io>
This commit is contained in:
parent
f6d5f576d5
commit
f950a91732
@ -43,6 +43,8 @@ std::string PermissionToString(Permission permission) {
|
||||
return "REPLICATION";
|
||||
case Permission::LOCK_PATH:
|
||||
return "LOCK_PATH";
|
||||
case Permission::READ_FILE:
|
||||
return "READ_FILE";
|
||||
case Permission::AUTH:
|
||||
return "AUTH";
|
||||
}
|
||||
|
@ -23,15 +23,16 @@ enum class Permission : uint64_t {
|
||||
DUMP = 1U << 9U,
|
||||
REPLICATION = 1U << 10U,
|
||||
LOCK_PATH = 1U << 11U,
|
||||
READ_FILE = 1U << 12U,
|
||||
AUTH = 1U << 16U
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
// Constant list of all available permissions.
|
||||
const std::vector<Permission> kPermissionsAll = {
|
||||
Permission::MATCH, Permission::CREATE, Permission::MERGE, Permission::DELETE, Permission::SET,
|
||||
Permission::REMOVE, Permission::INDEX, Permission::STATS, Permission::CONSTRAINT, Permission::DUMP,
|
||||
Permission::AUTH, Permission::REPLICATION, Permission::LOCK_PATH};
|
||||
Permission::MATCH, Permission::CREATE, Permission::MERGE, Permission::DELETE, Permission::SET,
|
||||
Permission::REMOVE, Permission::INDEX, Permission::STATS, Permission::CONSTRAINT, Permission::DUMP,
|
||||
Permission::AUTH, Permission::REPLICATION, Permission::LOCK_PATH, Permission::READ_FILE};
|
||||
|
||||
// Function that converts a permission to its string representation.
|
||||
std::string PermissionToString(Permission permission);
|
||||
|
@ -28,6 +28,8 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) {
|
||||
return auth::Permission::REPLICATION;
|
||||
case query::AuthQuery::Privilege::LOCK_PATH:
|
||||
return auth::Permission::LOCK_PATH;
|
||||
case query::AuthQuery::Privilege::READ_FILE:
|
||||
return auth::Permission::READ_FILE;
|
||||
case query::AuthQuery::Privilege::AUTH:
|
||||
return auth::Permission::AUTH;
|
||||
}
|
||||
|
@ -2191,7 +2191,7 @@ cpp<#
|
||||
(:serialize))
|
||||
(lcp:define-enum privilege
|
||||
(create delete match merge set remove index stats auth constraint
|
||||
dump replication lock_path)
|
||||
dump replication lock_path read_file)
|
||||
(:serialize))
|
||||
#>cpp
|
||||
AuthQuery() = default;
|
||||
@ -2353,4 +2353,45 @@ cpp<#
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:define-class load-csv (clause)
|
||||
((file "Expression *" :scope :public)
|
||||
(with_header "bool" :scope :public)
|
||||
(ignore_bad "bool" :scope :public)
|
||||
(delimiter "Expression *" :initval "nullptr" :scope :public)
|
||||
(quote "Expression *" :initval "nullptr" :scope :public)
|
||||
(row_var "Identifier *" :initval "nullptr" :scope :public
|
||||
:slk-save #'slk-save-ast-pointer
|
||||
:slk-load (slk-load-ast-pointer "Identifier")))
|
||||
|
||||
(:public
|
||||
#>cpp
|
||||
LoadCsv() = default;
|
||||
|
||||
bool Accept(HierarchicalTreeVisitor &visitor) override {
|
||||
if (visitor.PreVisit(*this)) {
|
||||
row_var_->Accept(visitor);
|
||||
}
|
||||
return visitor.PostVisit(*this);
|
||||
}
|
||||
cpp<#)
|
||||
(:protected
|
||||
#>cpp
|
||||
explicit LoadCsv(Expression *file, bool with_header, bool ignore_bad, Expression *delimiter,
|
||||
Expression* quote, Identifier* row_var)
|
||||
: file_(file),
|
||||
with_header_(with_header),
|
||||
ignore_bad_(ignore_bad),
|
||||
delimiter_(delimiter),
|
||||
quote_(quote),
|
||||
row_var_(row_var) {
|
||||
DMG_ASSERT(row_var, "LoadCsv cannot take nullptr for identifier");
|
||||
}
|
||||
cpp<#)
|
||||
(:private
|
||||
#>cpp
|
||||
friend class AstStorage;
|
||||
cpp<#)
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:pop-namespace) ;; namespace query
|
||||
|
@ -74,6 +74,7 @@ class RegexMatch;
|
||||
class DumpQuery;
|
||||
class ReplicationQuery;
|
||||
class LockPathQuery;
|
||||
class LoadCsv;
|
||||
|
||||
using TreeCompositeVisitor = ::utils::CompositeVisitor<
|
||||
SingleQuery, CypherUnion, NamedExpression, OrOperator, XorOperator, AndOperator, NotOperator, AdditionOperator,
|
||||
@ -82,7 +83,7 @@ using TreeCompositeVisitor = ::utils::CompositeVisitor<
|
||||
ListSlicingOperator, IfOperator, UnaryPlusOperator, UnaryMinusOperator, IsNullOperator, ListLiteral, MapLiteral,
|
||||
PropertyLookup, LabelsTest, Aggregation, Function, Reduce, Coalesce, Extract, All, Single, Any, None, CallProcedure,
|
||||
Create, Match, Return, With, Pattern, NodeAtom, EdgeAtom, Delete, Where, SetProperty, SetProperties, SetLabels,
|
||||
RemoveProperty, RemoveLabels, Merge, Unwind, RegexMatch>;
|
||||
RemoveProperty, RemoveLabels, Merge, Unwind, RegexMatch, LoadCsv>;
|
||||
|
||||
using TreeLeafVisitor = ::utils::LeafVisitor<Identifier, PrimitiveLiteral, ParameterLookup>;
|
||||
|
||||
@ -105,7 +106,8 @@ class ExpressionVisitor
|
||||
None, ParameterLookup, Identifier, PrimitiveLiteral, RegexMatch> {};
|
||||
|
||||
template <class TResult>
|
||||
class QueryVisitor : public ::utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery,
|
||||
InfoQuery, ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery> {};
|
||||
class QueryVisitor
|
||||
: public ::utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery, InfoQuery,
|
||||
ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, LoadCsv> {};
|
||||
|
||||
} // namespace query
|
||||
|
@ -263,6 +263,47 @@ antlrcpp::Any CypherMainVisitor::visitLockPathQuery(MemgraphCypher::LockPathQuer
|
||||
return lock_query;
|
||||
}
|
||||
|
||||
antlrcpp::Any CypherMainVisitor::visitLoadCsv(MemgraphCypher::LoadCsvContext *ctx) {
|
||||
auto *load_csv = storage_->Create<LoadCsv>();
|
||||
// handle file name
|
||||
if (ctx->csvFile()->literal()->StringLiteral()) {
|
||||
load_csv->file_ = ctx->csvFile()->accept(this);
|
||||
} else {
|
||||
throw SemanticException("CSV file path should be a string literal");
|
||||
}
|
||||
|
||||
// handle header options
|
||||
// Don't have to check for ctx->HEADER(), as it's a mandatory token.
|
||||
// Just need to check if ctx->WITH() is not nullptr - otherwise, we have a
|
||||
// ctx->NO() and ctx->HEADER() present.
|
||||
load_csv->with_header_ = ctx->WITH() != nullptr;
|
||||
|
||||
// handle skip bad row option
|
||||
load_csv->ignore_bad_ = ctx->IGNORE() && ctx->BAD();
|
||||
|
||||
// handle delimiter
|
||||
if (ctx->DELIMITER()) {
|
||||
if (ctx->delimiter()->literal()->StringLiteral()) {
|
||||
load_csv->delimiter_ = ctx->delimiter()->accept(this);
|
||||
} else {
|
||||
throw SemanticException("Delimiter should be a string literal");
|
||||
}
|
||||
}
|
||||
|
||||
// handle quote
|
||||
if (ctx->QUOTE()) {
|
||||
if (ctx->quote()->literal()->StringLiteral()) {
|
||||
load_csv->quote_ = ctx->quote()->accept(this);
|
||||
} else {
|
||||
throw SemanticException("Quote should be a string literal");
|
||||
}
|
||||
}
|
||||
|
||||
// handle row variable
|
||||
load_csv->row_var_ = storage_->Create<Identifier>(ctx->rowVar()->variable()->accept(this).as<std::string>());
|
||||
return load_csv;
|
||||
}
|
||||
|
||||
antlrcpp::Any CypherMainVisitor::visitCypherUnion(MemgraphCypher::CypherUnionContext *ctx) {
|
||||
bool distinct = !ctx->ALL();
|
||||
auto *cypher_union = storage_->Create<CypherUnion>(distinct);
|
||||
@ -292,6 +333,7 @@ antlrcpp::Any CypherMainVisitor::visitSingleQuery(MemgraphCypher::SingleQueryCon
|
||||
bool has_return = false;
|
||||
bool has_optional_match = false;
|
||||
bool has_call_procedure = false;
|
||||
bool has_load_csv = false;
|
||||
|
||||
for (Clause *clause : single_query->clauses_) {
|
||||
const auto &clause_type = clause->GetTypeInfo();
|
||||
@ -304,6 +346,14 @@ antlrcpp::Any CypherMainVisitor::visitSingleQuery(MemgraphCypher::SingleQueryCon
|
||||
if (has_update || has_return) {
|
||||
throw SemanticException("UNWIND can't be put after RETURN clause or after an update.");
|
||||
}
|
||||
} else if (utils::IsSubtype(clause_type, LoadCsv::kType)) {
|
||||
if (has_load_csv) {
|
||||
throw SemanticException("Can't have multiple LOAD CSV clauses in a single query.");
|
||||
}
|
||||
if (has_return) {
|
||||
throw SemanticException("LOAD CSV can't be put after RETURN clause.");
|
||||
}
|
||||
has_load_csv = true;
|
||||
} else if (auto *match = utils::Downcast<Match>(clause)) {
|
||||
if (has_update || has_return) {
|
||||
throw SemanticException("MATCH can't be put after RETURN clause or after an update.");
|
||||
@ -388,6 +438,9 @@ antlrcpp::Any CypherMainVisitor::visitClause(MemgraphCypher::ClauseContext *ctx)
|
||||
if (ctx->callProcedure()) {
|
||||
return static_cast<Clause *>(ctx->callProcedure()->accept(this).as<CallProcedure *>());
|
||||
}
|
||||
if (ctx->loadCsv()) {
|
||||
return static_cast<Clause *>(ctx->loadCsv()->accept(this).as<LoadCsv *>());
|
||||
}
|
||||
// TODO: implement other clauses.
|
||||
throw utils::NotYetImplemented("clause '{}'", ctx->getText());
|
||||
return 0;
|
||||
|
@ -208,6 +208,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
|
||||
*/
|
||||
antlrcpp::Any visitLockPathQuery(MemgraphCypher::LockPathQueryContext *ctx) override;
|
||||
|
||||
/**
|
||||
* @return LoadCsvQuery*
|
||||
*/
|
||||
antlrcpp::Any visitLoadCsv(MemgraphCypher::LoadCsvContext *ctx) override;
|
||||
|
||||
/**
|
||||
* @return CypherUnion*
|
||||
*/
|
||||
|
@ -10,7 +10,11 @@ memgraphCypherKeyword : cypherKeyword
|
||||
| ALTER
|
||||
| ASYNC
|
||||
| AUTH
|
||||
| BAD
|
||||
| CLEAR
|
||||
| CSV
|
||||
| DATA
|
||||
| DELIMITER
|
||||
| DATABASE
|
||||
| DENY
|
||||
| DROP
|
||||
@ -18,10 +22,13 @@ memgraphCypherKeyword : cypherKeyword
|
||||
| FOR
|
||||
| FROM
|
||||
| GRANT
|
||||
| HEADER
|
||||
| IDENTIFIED
|
||||
| LOAD
|
||||
| LOCK
|
||||
| MAIN
|
||||
| MODE
|
||||
| NO
|
||||
| PASSWORD
|
||||
| PORT
|
||||
| PRIVILEGES
|
||||
@ -32,6 +39,7 @@ memgraphCypherKeyword : cypherKeyword
|
||||
| REVOKE
|
||||
| ROLE
|
||||
| ROLES
|
||||
| QUOTE
|
||||
| STATS
|
||||
| SYNC
|
||||
| TIMEOUT
|
||||
@ -82,6 +90,33 @@ replicationQuery : setReplicationRole
|
||||
| showReplicas
|
||||
;
|
||||
|
||||
clause : cypherMatch
|
||||
| unwind
|
||||
| merge
|
||||
| create
|
||||
| set
|
||||
| cypherDelete
|
||||
| remove
|
||||
| with
|
||||
| cypherReturn
|
||||
| callProcedure
|
||||
| loadCsv
|
||||
;
|
||||
|
||||
loadCsv : LOAD CSV FROM csvFile ( WITH | NO ) HEADER
|
||||
( IGNORE BAD ) ?
|
||||
( DELIMITER delimiter ) ?
|
||||
( QUOTE quote ) ?
|
||||
AS rowVar ;
|
||||
|
||||
csvFile : literal ;
|
||||
|
||||
delimiter : literal ;
|
||||
|
||||
quote : literal ;
|
||||
|
||||
rowVar : variable ;
|
||||
|
||||
userOrRoleName : symbolicName ;
|
||||
|
||||
createRole : CREATE ROLE role=userOrRoleName ;
|
||||
@ -141,3 +176,4 @@ showReplicas : SHOW REPLICAS ;
|
||||
|
||||
lockPathQuery : ( LOCK | UNLOCK ) DATA DIRECTORY ;
|
||||
|
||||
|
||||
|
@ -13,8 +13,11 @@ import CypherLexer ;
|
||||
ALTER : A L T E R ;
|
||||
ASYNC : A S Y N C ;
|
||||
AUTH : A U T H ;
|
||||
BAD : B A D ;
|
||||
CLEAR : C L E A R ;
|
||||
CSV : C S V ;
|
||||
DATA : D A T A ;
|
||||
DELIMITER : D E L I M I T E R ;
|
||||
DATABASE : D A T A B A S E ;
|
||||
DENY : D E N Y ;
|
||||
DIRECTORY : D I R E C T O R Y ;
|
||||
@ -24,10 +27,14 @@ FOR : F O R ;
|
||||
FROM : F R O M ;
|
||||
GRANT : G R A N T ;
|
||||
GRANTS : G R A N T S ;
|
||||
HEADER : H E A D E R ;
|
||||
IDENTIFIED : I D E N T I F I E D ;
|
||||
IGNORE : I G N O R E ;
|
||||
LOAD : L O A D ;
|
||||
LOCK : L O C K ;
|
||||
MAIN : M A I N ;
|
||||
MODE : M O D E ;
|
||||
NO : N O ;
|
||||
PASSWORD : P A S S W O R D ;
|
||||
PORT : P O R T ;
|
||||
PRIVILEGES : P R I V I L E G E S ;
|
||||
@ -38,6 +45,7 @@ REPLICATION : R E P L I C A T I O N ;
|
||||
REVOKE : R E V O K E ;
|
||||
ROLE : R O L E ;
|
||||
ROLES : R O L E S ;
|
||||
QUOTE : Q U O T E ;
|
||||
STATS : S T A T S ;
|
||||
SYNC : S Y N C ;
|
||||
TIMEOUT : T I M E O U T ;
|
||||
|
@ -50,6 +50,8 @@ class PrivilegeExtractor : public QueryVisitor<void>, public HierarchicalTreeVis
|
||||
|
||||
void Visit(LockPathQuery &lock_path_query) override { AddPrivilege(AuthQuery::Privilege::LOCK_PATH); }
|
||||
|
||||
void Visit(LoadCsv &load_csv) override { AddPrivilege(AuthQuery::Privilege::READ_FILE); }
|
||||
|
||||
void Visit(ReplicationQuery &replication_query) override {
|
||||
switch (replication_query.action_) {
|
||||
case ReplicationQuery::Action::SET_REPLICATION_ROLE:
|
||||
|
@ -162,6 +162,16 @@ bool SymbolGenerator::PostVisit(CallProcedure &call_proc) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SymbolGenerator::PreVisit(LoadCsv &load_csv) { return false; }
|
||||
|
||||
bool SymbolGenerator::PostVisit(LoadCsv &load_csv) {
|
||||
if (HasSymbol(load_csv.row_var_->name_)) {
|
||||
throw RedeclareVariableError(load_csv.row_var_->name_);
|
||||
}
|
||||
load_csv.row_var_->MapTo(CreateSymbol(load_csv.row_var_->name_, true));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SymbolGenerator::PreVisit(Return &ret) {
|
||||
scope_.in_return = true;
|
||||
VisitReturnBody(ret.body_);
|
||||
|
@ -36,6 +36,8 @@ class SymbolGenerator : public HierarchicalTreeVisitor {
|
||||
bool PostVisit(Create &) override;
|
||||
bool PreVisit(CallProcedure &) override;
|
||||
bool PostVisit(CallProcedure &) override;
|
||||
bool PreVisit(LoadCsv &) override;
|
||||
bool PostVisit(LoadCsv &) override;
|
||||
bool PreVisit(Return &) override;
|
||||
bool PostVisit(Return &) override;
|
||||
bool PreVisit(With &) override;
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "query/plan/vertex_count_cache.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/csv_parsing.hpp"
|
||||
#include "utils/event_counter.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
@ -78,7 +79,7 @@ struct ParsedQuery {
|
||||
ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> ¶ms,
|
||||
utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock) {
|
||||
// Strip the query for caching purposes. The process of stripping a query
|
||||
// "normalizes" it by replacing any literals with new parameters . This
|
||||
// "normalizes" it by replacing any literals with new parameters. This
|
||||
// results in just the *structure* of the query being taken into account for
|
||||
// caching.
|
||||
frontend::StrippedQuery stripped_query{query_string};
|
||||
@ -473,6 +474,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, ReplQueryHandler *
|
||||
Frame frame(0);
|
||||
SymbolTable symbol_table;
|
||||
EvaluationContext evaluation_context;
|
||||
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
|
||||
// the argument to Callback.
|
||||
evaluation_context.timestamp =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
|
||||
.count();
|
||||
@ -713,6 +716,7 @@ std::optional<ExecutionContext> PullPlan::Pull(AnyStream *stream, std::optional<
|
||||
ctx_.profile_execution_time = execution_time_;
|
||||
return ctx_;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
/**
|
||||
@ -1093,7 +1097,7 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
|
||||
RWType::NONE};
|
||||
}
|
||||
|
||||
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
|
||||
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
|
||||
InterpreterContext *interpreter_context, DbAccessor *dba) {
|
||||
if (in_explicit_transaction) {
|
||||
throw ReplicationModificationInMulticommandTxException();
|
||||
|
@ -24,9 +24,11 @@
|
||||
#include "query/procedure/mg_procedure_impl.hpp"
|
||||
#include "query/procedure/module.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/csv_parsing.hpp"
|
||||
#include "utils/event_counter.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/fnv.hpp"
|
||||
#include "utils/likely.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/pmr/unordered_map.hpp"
|
||||
#include "utils/pmr/unordered_set.hpp"
|
||||
@ -1794,8 +1796,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
if (!input_cursor_->Pull(frame, context)) return false;
|
||||
|
||||
// Delete should get the latest information, this way it is also possible
|
||||
// to
|
||||
// delete newly added nodes and edges.
|
||||
// to delete newly added nodes and edges.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
|
||||
storage::View::NEW);
|
||||
auto *pull_memory = context.evaluation_context.memory;
|
||||
@ -3679,4 +3680,134 @@ UniqueCursorPtr CallProcedure::MakeCursor(utils::MemoryResource *mem) const {
|
||||
return MakeUniqueCursorPtr<CallProcedureCursor>(mem, this, mem);
|
||||
}
|
||||
|
||||
LoadCsv::LoadCsv(std::shared_ptr<LogicalOperator> input, Expression *file, bool with_header, bool ignore_bad,
|
||||
Expression *delimiter, Expression *quote, Symbol row_var)
|
||||
: input_(input ? input : (std::make_shared<Once>())),
|
||||
file_(file),
|
||||
with_header_(with_header),
|
||||
ignore_bad_(ignore_bad),
|
||||
delimiter_(delimiter),
|
||||
quote_(quote),
|
||||
row_var_(row_var) {
|
||||
MG_ASSERT(file_, "Something went wrong - '{}' member file_ shouldn't be a nullptr", __func__);
|
||||
}
|
||||
|
||||
bool LoadCsv::Accept(HierarchicalLogicalOperatorVisitor &visitor) { return false; };
|
||||
|
||||
class LoadCsvCursor;
|
||||
|
||||
std::vector<Symbol> LoadCsv::OutputSymbols(const SymbolTable &sym_table) const { return {row_var_}; };
|
||||
|
||||
std::vector<Symbol> LoadCsv::ModifiedSymbols(const SymbolTable &sym_table) const {
|
||||
auto symbols = input_->ModifiedSymbols(sym_table);
|
||||
symbols.push_back(row_var_);
|
||||
return symbols;
|
||||
};
|
||||
|
||||
namespace {
|
||||
// copy-pasted from interpreter.cpp
|
||||
TypedValue EvaluateOptionalExpression(Expression *expression, ExpressionEvaluator *eval) {
|
||||
return expression ? expression->Accept(*eval) : TypedValue();
|
||||
}
|
||||
|
||||
auto ToOptionalString(ExpressionEvaluator *evaluator, Expression *expression) -> std::optional<utils::pmr::string> {
|
||||
const auto evaluated_expr = EvaluateOptionalExpression(expression, evaluator);
|
||||
if (evaluated_expr.IsString()) {
|
||||
return utils::pmr::string(evaluated_expr.ValueString(), evaluator->GetMemoryResource());
|
||||
}
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
TypedValue CsvRowToTypedList(csv::Reader::Row row, utils::MemoryResource *mem) {
|
||||
auto typed_columns = utils::pmr::vector<TypedValue>(mem);
|
||||
std::transform(begin(row), end(row), std::back_inserter(typed_columns),
|
||||
[mem = mem](auto &column) { return TypedValue(column, mem); });
|
||||
return TypedValue(typed_columns, mem);
|
||||
}
|
||||
|
||||
TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header, utils::MemoryResource *mem) {
|
||||
// a valid row has the same number of elements as the header
|
||||
utils::pmr::map<utils::pmr::string, TypedValue> m(mem);
|
||||
for (auto i = 0; i < row.size(); ++i) {
|
||||
m.emplace(header[i], TypedValue(row[i], mem));
|
||||
}
|
||||
return TypedValue(m, mem);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
class LoadCsvCursor : public Cursor {
|
||||
const LoadCsv *self_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
bool input_is_once_;
|
||||
std::optional<csv::Reader> reader_{};
|
||||
|
||||
public:
|
||||
LoadCsvCursor(const LoadCsv *self, utils::MemoryResource *mem)
|
||||
: self_(self), input_cursor_(self_->input_->MakeCursor(mem)) {
|
||||
input_is_once_ = dynamic_cast<Once *>(self_->input_.get());
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("LoadCsv");
|
||||
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
|
||||
// ToDo(the-joksim):
|
||||
// - this is an ungodly hack because the pipeline of creating a plan
|
||||
// doesn't allow evaluating the expressions contained in self_->file_,
|
||||
// self_->delimiter_, and self_->quote_ earlier (say, in the interpreter.cpp)
|
||||
// without massacring the code even worse than I did here
|
||||
if (UNLIKELY(!reader_)) {
|
||||
reader_ = MakeReader(&context.evaluation_context);
|
||||
}
|
||||
|
||||
bool input_pulled = input_cursor_->Pull(frame, context);
|
||||
|
||||
// If the input is Once, we have to keep going until we read all the rows,
|
||||
// regardless of whether the pull on Once returned false.
|
||||
// If we have e.g. MATCH(n) LOAD CSV ... AS x SET n.name = x.name, then we
|
||||
// have to read at most cardinality(n) rows (but we can read less and stop
|
||||
// pulling MATCH).
|
||||
if (!input_is_once_ && !input_pulled) return false;
|
||||
|
||||
if (auto row = reader_->GetNextRow()) {
|
||||
if (!reader_->HasHeader()) {
|
||||
frame[self_->row_var_] = CsvRowToTypedList(std::move(*row), context.evaluation_context.memory);
|
||||
} else {
|
||||
frame[self_->row_var_] =
|
||||
CsvRowToTypedMap(std::move(*row), *reader_->GetHeader(), context.evaluation_context.memory);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void Reset() override { input_cursor_->Reset(); }
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
private:
|
||||
csv::Reader MakeReader(EvaluationContext *eval_context) {
|
||||
Frame frame(0);
|
||||
SymbolTable symbol_table;
|
||||
DbAccessor *dba = nullptr;
|
||||
auto evaluator = ExpressionEvaluator(&frame, symbol_table, *eval_context, dba, storage::View::OLD);
|
||||
|
||||
auto maybe_file = ToOptionalString(&evaluator, self_->file_);
|
||||
auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
|
||||
auto maybe_quote = ToOptionalString(&evaluator, self_->quote_);
|
||||
|
||||
// no need to check if maybe_file is std::nullopt, as the parser makes sure
|
||||
// we can't get a nullptr for the 'file_' member in the LoadCsv clause
|
||||
return csv::Reader(*maybe_file,
|
||||
csv::Reader::Config(self_->with_header_, self_->ignore_bad_, maybe_delim, maybe_quote),
|
||||
eval_context->memory);
|
||||
}
|
||||
};
|
||||
|
||||
UniqueCursorPtr LoadCsv::MakeCursor(utils::MemoryResource *mem) const {
|
||||
return MakeUniqueCursorPtr<LoadCsvCursor>(mem, this, mem);
|
||||
};
|
||||
|
||||
} // namespace query::plan
|
||||
|
@ -117,6 +117,7 @@ class Distinct;
|
||||
class Union;
|
||||
class Cartesian;
|
||||
class CallProcedure;
|
||||
class LoadCsv;
|
||||
|
||||
using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor<
|
||||
Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel,
|
||||
@ -125,7 +126,7 @@ using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor<
|
||||
Expand, ExpandVariable, ConstructNamedPath, Filter, Produce, Delete,
|
||||
SetProperty, SetProperties, SetLabels, RemoveProperty, RemoveLabels,
|
||||
EdgeUniquenessFilter, Accumulate, Aggregate, Skip, Limit, OrderBy, Merge,
|
||||
Optional, Unwind, Distinct, Union, Cartesian, CallProcedure>;
|
||||
Optional, Unwind, Distinct, Union, Cartesian, CallProcedure, LoadCsv>;
|
||||
|
||||
using LogicalOperatorLeafVisitor = ::utils::LeafVisitor<Once>;
|
||||
|
||||
@ -2156,5 +2157,38 @@ at once. Instead, each call of the callback should return a single row of the ta
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:define-class load-csv (logical-operator)
|
||||
((input "std::shared_ptr<LogicalOperator>" :scope :public
|
||||
:slk-save #'slk-save-operator-pointer
|
||||
:slk-load #'slk-load-operator-pointer)
|
||||
(file "Expression *" :scope :public)
|
||||
(with_header "bool" :scope :public)
|
||||
(ignore_bad "bool" :scope :public)
|
||||
(delimiter "Expression *" :initval "nullptr" :scope :public
|
||||
:slk-save #'slk-save-ast-pointer
|
||||
:slk-load (slk-load-ast-pointer "Expression"))
|
||||
(quote "Expression *" :initval "nullptr" :scope :public
|
||||
:slk-save #'slk-save-ast-pointer
|
||||
:slk-load (slk-load-ast-pointer "Expression"))
|
||||
(row_var "Symbol" :scope :public))
|
||||
(:public
|
||||
#>cpp
|
||||
LoadCsv() = default;
|
||||
LoadCsv(std::shared_ptr<LogicalOperator> input, Expression *file, bool with_header, bool ignore_bad,
|
||||
Expression* delimiter, Expression* quote, Symbol row_var);
|
||||
bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
|
||||
UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override;
|
||||
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
|
||||
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
|
||||
|
||||
bool HasSingleInput() const override { return true; }
|
||||
std::shared_ptr<LogicalOperator> input() const override { return input_; }
|
||||
void set_input(std::shared_ptr<LogicalOperator> input) override {
|
||||
input_ = input;
|
||||
}
|
||||
cpp<#)
|
||||
(:serialize (:slk))
|
||||
(:clone))
|
||||
|
||||
(lcp:pop-namespace) ;; plan
|
||||
(lcp:pop-namespace) ;; query
|
||||
|
@ -206,6 +206,11 @@ bool PlanPrinter::PreVisit(query::plan::CallProcedure &op) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PlanPrinter::PreVisit(query::plan::LoadCsv &op) {
|
||||
WithPrintLn([&op](auto &out) { out << "* LoadCsv {" << op.row_var_.name() << "}"; });
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PlanPrinter::Visit(query::plan::Once &op) {
|
||||
WithPrintLn([](auto &out) { out << "* Once"; });
|
||||
return true;
|
||||
@ -803,6 +808,23 @@ bool PlanToJsonVisitor::PreVisit(query::plan::CallProcedure &op) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PlanToJsonVisitor::PreVisit(query::plan::LoadCsv &op) {
|
||||
json self;
|
||||
self["name"] = "LoadCsv";
|
||||
self["file"] = ToJson(op.file_);
|
||||
self["with_header"] = op.with_header_;
|
||||
self["ignore_bad"] = op.ignore_bad_;
|
||||
self["delimiter"] = ToJson(op.delimiter_);
|
||||
self["quote"] = ToJson(op.quote_);
|
||||
self["row_variable"] = ToJson(op.row_var_);
|
||||
|
||||
op.input_->Accept(*this);
|
||||
self["input"] = PopOutput();
|
||||
|
||||
output_ = std::move(self);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PlanToJsonVisitor::PreVisit(Distinct &op) {
|
||||
json self;
|
||||
self["name"] = "Distinct";
|
||||
|
@ -81,6 +81,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
|
||||
|
||||
bool PreVisit(Unwind &) override;
|
||||
bool PreVisit(CallProcedure &) override;
|
||||
bool PreVisit(LoadCsv &) override;
|
||||
|
||||
bool Visit(Once &) override;
|
||||
|
||||
@ -194,6 +195,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
|
||||
|
||||
bool PreVisit(Unwind &) override;
|
||||
bool PreVisit(CallProcedure &) override;
|
||||
bool PreVisit(LoadCsv &) override;
|
||||
|
||||
bool Visit(Once &) override;
|
||||
|
||||
|
@ -203,6 +203,13 @@ class RuleBasedPlanner {
|
||||
input_op = std::make_unique<plan::CallProcedure>(
|
||||
std::move(input_op), call_proc->procedure_name_, call_proc->arguments_, call_proc->result_fields_,
|
||||
result_symbols, call_proc->memory_limit_, call_proc->memory_scale_);
|
||||
} else if (auto *load_csv = utils::Downcast<query::LoadCsv>(clause)) {
|
||||
const auto &row_sym = context.symbol_table->at(*load_csv->row_var_);
|
||||
context.bound_symbols.insert(row_sym);
|
||||
|
||||
input_op =
|
||||
std::make_unique<plan::LoadCsv>(std::move(input_op), load_csv->file_, load_csv->with_header_,
|
||||
load_csv->ignore_bad_, load_csv->delimiter_, load_csv->quote_, row_sym);
|
||||
} else {
|
||||
throw utils::NotYetImplemented("clause '{}' conversion to operator(s)", clause->GetTypeInfo().name);
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
set(utils_src_files
|
||||
event_counter.cpp
|
||||
csv_parsing.cpp
|
||||
file.cpp
|
||||
file_locker.cpp
|
||||
memory.cpp
|
||||
|
@ -19,8 +19,8 @@ void Reader::InitializeStream() {
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::string> Reader::GetNextLine() {
|
||||
std::string line;
|
||||
std::optional<utils::pmr::string> Reader::GetNextLine() {
|
||||
utils::pmr::string line(memory_);
|
||||
if (!std::getline(csv_stream_, line)) {
|
||||
// reached end of file or an I/0 error occurred
|
||||
if (!csv_stream_.good()) {
|
||||
@ -32,18 +32,34 @@ std::optional<std::string> Reader::GetNextLine() {
|
||||
return line;
|
||||
}
|
||||
|
||||
std::optional<Reader::Header> Reader::ParseHeader() {
|
||||
Reader::ParsingResult Reader::ParseHeader() {
|
||||
// header must be the very first line in the file
|
||||
MG_ASSERT(line_count_ == 1, fmt::format("Invalid use of {}", __func__));
|
||||
const auto maybe_line = GetNextLine();
|
||||
if (!maybe_line) {
|
||||
return ParseRow();
|
||||
}
|
||||
|
||||
void Reader::TryInitializeHeader() {
|
||||
if (!HasHeader()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto header = ParseHeader();
|
||||
if (header.HasError()) {
|
||||
throw CsvReadException("CSV reading : {}", header.GetError().message);
|
||||
}
|
||||
|
||||
if (header->empty()) {
|
||||
throw CsvReadException("CSV file {} empty!", path_);
|
||||
}
|
||||
Header header;
|
||||
// set the 'number_of_fields_' once this method is implemented fully
|
||||
return std::nullopt;
|
||||
|
||||
number_of_columns_ = header->size();
|
||||
header_ = *header;
|
||||
}
|
||||
|
||||
[[nodiscard]] bool Reader::HasHeader() const { return read_config_.with_header; }
|
||||
|
||||
const std::optional<Reader::Header> &Reader::GetHeader() const { return header_; }
|
||||
|
||||
namespace {
|
||||
enum class CsvParserState : uint8_t {
|
||||
INITIAL_FIELD,
|
||||
@ -59,8 +75,8 @@ bool SubstringStartsWith(const std::string_view str, size_t pos, const std::stri
|
||||
} // namespace
|
||||
|
||||
Reader::ParsingResult Reader::ParseRow() {
|
||||
std::vector<std::string> row;
|
||||
std::string column;
|
||||
utils::pmr::vector<utils::pmr::string> row(memory_);
|
||||
utils::pmr::string column(memory_);
|
||||
|
||||
auto state = CsvParserState::INITIAL_FIELD;
|
||||
|
||||
@ -79,21 +95,21 @@ Reader::ParsingResult Reader::ParseRow() {
|
||||
// Null bytes aren't allowed in CSVs.
|
||||
if (c == '\0') {
|
||||
return ParseError(ParseError::ErrorCode::NULL_BYTE,
|
||||
fmt::format("CSV: Line {:d} contains NULL byte", line_count_));
|
||||
fmt::format("CSV: Line {:d} contains NULL byte", line_count_ - 1));
|
||||
}
|
||||
|
||||
switch (state) {
|
||||
case CsvParserState::INITIAL_FIELD:
|
||||
case CsvParserState::NEXT_FIELD: {
|
||||
if (SubstringStartsWith(*maybe_line, i, read_config_.quote)) {
|
||||
if (SubstringStartsWith(*maybe_line, i, *read_config_.quote)) {
|
||||
// The current field is a quoted field.
|
||||
state = CsvParserState::QUOTING;
|
||||
i += read_config_.quote.size() - 1;
|
||||
} else if (SubstringStartsWith(*maybe_line, i, read_config_.delimiter)) {
|
||||
i += read_config_.quote->size() - 1;
|
||||
} else if (SubstringStartsWith(*maybe_line, i, *read_config_.delimiter)) {
|
||||
// The current field has an empty value.
|
||||
row.emplace_back("");
|
||||
state = CsvParserState::NEXT_FIELD;
|
||||
i += read_config_.delimiter.size() - 1;
|
||||
i += read_config_.delimiter->size() - 1;
|
||||
} else {
|
||||
// The current field is a regular field.
|
||||
column.push_back(c);
|
||||
@ -102,40 +118,40 @@ Reader::ParsingResult Reader::ParseRow() {
|
||||
break;
|
||||
}
|
||||
case CsvParserState::QUOTING: {
|
||||
auto quote_now = SubstringStartsWith(*maybe_line, i, read_config_.quote);
|
||||
auto quote_next = SubstringStartsWith(*maybe_line, i + read_config_.quote.size(), read_config_.quote);
|
||||
auto quote_now = SubstringStartsWith(*maybe_line, i, *read_config_.quote);
|
||||
auto quote_next = SubstringStartsWith(*maybe_line, i + read_config_.quote->size(), *read_config_.quote);
|
||||
if (quote_now && quote_next) {
|
||||
// This is an escaped quote character.
|
||||
column += read_config_.quote;
|
||||
i += read_config_.quote.size() * 2 - 1;
|
||||
column += *read_config_.quote;
|
||||
i += read_config_.quote->size() * 2 - 1;
|
||||
} else if (quote_now && !quote_next) {
|
||||
// This is the end of the quoted field.
|
||||
row.emplace_back(std::move(column));
|
||||
state = CsvParserState::EXPECT_DELIMITER;
|
||||
i += read_config_.quote.size() - 1;
|
||||
i += read_config_.quote->size() - 1;
|
||||
} else {
|
||||
column.push_back(c);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CsvParserState::NOT_QUOTING: {
|
||||
if (SubstringStartsWith(*maybe_line, i, read_config_.delimiter)) {
|
||||
if (SubstringStartsWith(*maybe_line, i, *read_config_.delimiter)) {
|
||||
row.emplace_back(std::move(column));
|
||||
state = CsvParserState::NEXT_FIELD;
|
||||
i += read_config_.delimiter.size() - 1;
|
||||
i += read_config_.delimiter->size() - 1;
|
||||
} else {
|
||||
column.push_back(c);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CsvParserState::EXPECT_DELIMITER: {
|
||||
if (SubstringStartsWith(*maybe_line, i, read_config_.delimiter)) {
|
||||
if (SubstringStartsWith(*maybe_line, i, *read_config_.delimiter)) {
|
||||
state = CsvParserState::NEXT_FIELD;
|
||||
i += read_config_.delimiter.size() - 1;
|
||||
i += read_config_.delimiter->size() - 1;
|
||||
} else {
|
||||
return ParseError(ParseError::ErrorCode::UNEXPECTED_TOKEN,
|
||||
fmt::format("CSV Reader: Expected '{}' after '{}', but got '{}'", read_config_.delimiter,
|
||||
read_config_.quote, c));
|
||||
fmt::format("CSV Reader: Expected '{}' after '{}', but got '{}' at line {:d}",
|
||||
*read_config_.delimiter, *read_config_.quote, c, line_count_ - 1));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -168,47 +184,42 @@ Reader::ParsingResult Reader::ParseRow() {
|
||||
|
||||
// reached the end of file - return empty row
|
||||
if (row.empty()) {
|
||||
return Row(row);
|
||||
return row;
|
||||
}
|
||||
|
||||
// if there's no header, then:
|
||||
// - if we skip bad rows, then the very first __valid__ row will
|
||||
// determine the allowed number of columns
|
||||
// - if we don't skip bad rows, the very first row will determine the allowed
|
||||
// number of columns in all subsequent rows
|
||||
if (!read_config_.with_header && number_of_columns_ == 0) {
|
||||
MG_ASSERT(!row.empty());
|
||||
number_of_columns_ = row.size();
|
||||
// Has header, but the header has already been read and the number_of_columns_
|
||||
// is already set. Otherwise, we would get an error every time we'd try to
|
||||
// parse the header.
|
||||
// Also, if we don't have a header, the 'number_of_columns_' will be 0, so no
|
||||
// need to check the number of columns.
|
||||
if (UNLIKELY(number_of_columns_ != 0 && row.size() != number_of_columns_)) {
|
||||
return ParseError(ParseError::ErrorCode::BAD_NUM_OF_COLUMNS,
|
||||
// ToDo(the-joksim):
|
||||
// - 'line_count_ - 1' is the last line of a row (as a
|
||||
// row may span several lines) ==> should have a row
|
||||
// counter
|
||||
fmt::format("Expected {:d} columns in row {:d}, but got {:d}", number_of_columns_,
|
||||
line_count_ - 1, row.size()));
|
||||
}
|
||||
|
||||
if (row.size() != number_of_columns_) {
|
||||
return ParseError(
|
||||
ParseError::ErrorCode::BAD_NUM_OF_COLUMNS,
|
||||
// ToDo(the-joksim):
|
||||
// - 'line_count_ - 1' is the last line of a row (as a
|
||||
// row may span several lines) ==> should have a row
|
||||
// counter
|
||||
fmt::format("Expected {:d} columns in row {:d}, but got {:d}", number_of_columns_, line_count_, row.size()));
|
||||
}
|
||||
|
||||
return Row(row);
|
||||
return row;
|
||||
}
|
||||
|
||||
// Returns Reader::Row if the read row if valid;
|
||||
// Returns std::nullopt if end of file is reached or an error occurred
|
||||
// making it unreadable;
|
||||
// @throws CsvReadException if a bad row is encountered, and the skip_bad is set
|
||||
// @throws CsvReadException if a bad row is encountered, and the ignore_bad is set
|
||||
// to 'true' in the Reader::Config.
|
||||
std::optional<Reader::Row> Reader::GetNextRow() {
|
||||
auto row = ParseRow();
|
||||
|
||||
if (row.HasError()) {
|
||||
if (!read_config_.skip_bad) {
|
||||
throw CsvReadException("CSV Reader: Bad row at line {:d}: {}", line_count_, row.GetError().message);
|
||||
if (!read_config_.ignore_bad) {
|
||||
throw CsvReadException("CSV Reader: Bad row at line {:d}: {}", line_count_ - 1, row.GetError().message);
|
||||
}
|
||||
// try to parse as many times as necessary to reach a valid row
|
||||
do {
|
||||
spdlog::debug("CSV Reader: Bad row at line {:d}: {}", line_count_, row.GetError().message);
|
||||
spdlog::debug("CSV Reader: Bad row at line {:d}: {}", line_count_ - 1, row.GetError().message);
|
||||
if (!csv_stream_.good()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
@ -216,12 +227,11 @@ std::optional<Reader::Row> Reader::GetNextRow() {
|
||||
} while (row.HasError());
|
||||
}
|
||||
|
||||
auto ret = row.GetValue();
|
||||
if (ret.columns.empty()) {
|
||||
if (row->empty()) {
|
||||
// reached end of file
|
||||
return std::nullopt;
|
||||
}
|
||||
return ret;
|
||||
return *row;
|
||||
}
|
||||
|
||||
} // namespace csv
|
||||
|
@ -8,13 +8,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <optional>
|
||||
#include <filesystem>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/pmr/string.hpp"
|
||||
#include "utils/pmr/vector.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace csv {
|
||||
@ -26,38 +28,38 @@ class CsvReadException : public utils::BasicException {
|
||||
class Reader {
|
||||
public:
|
||||
struct Config {
|
||||
Config(){};
|
||||
Config(std::string delimiter, std::string quote, const bool with_header, const bool skip_bad)
|
||||
: delimiter(std::move(delimiter)), quote(std::move(quote)), with_header(with_header), skip_bad(skip_bad) {}
|
||||
Config() = default;
|
||||
Config(const bool with_header, const bool ignore_bad, std::optional<utils::pmr::string> delim,
|
||||
std::optional<utils::pmr::string> qt)
|
||||
: with_header(with_header), ignore_bad(ignore_bad), delimiter(std::move(delim)), quote(std::move(qt)) {}
|
||||
|
||||
std::string delimiter{","};
|
||||
std::string quote{"\""};
|
||||
bool with_header{false};
|
||||
bool skip_bad{false};
|
||||
bool ignore_bad{false};
|
||||
std::optional<utils::pmr::string> delimiter{};
|
||||
std::optional<utils::pmr::string> quote{};
|
||||
};
|
||||
|
||||
struct Row {
|
||||
Row() = default;
|
||||
explicit Row(std::vector<std::string> cols) : columns(std::move(cols)) {}
|
||||
std::vector<std::string> columns;
|
||||
};
|
||||
using Row = utils::pmr::vector<utils::pmr::string>;
|
||||
using Header = utils::pmr::vector<utils::pmr::string>;
|
||||
|
||||
explicit Reader(const std::filesystem::path &path, const Config cfg = {}) : path_(path), read_config_(cfg) {
|
||||
Reader() = default;
|
||||
explicit Reader(std::filesystem::path path, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource())
|
||||
: path_(std::move(path)), memory_(mem) {
|
||||
read_config_.with_header = cfg.with_header;
|
||||
read_config_.ignore_bad = cfg.ignore_bad;
|
||||
read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_};
|
||||
read_config_.quote = cfg.quote ? std::move(*cfg.quote) : utils::pmr::string{"\"", memory_};
|
||||
InitializeStream();
|
||||
if (read_config_.with_header) {
|
||||
header_ = ParseHeader();
|
||||
}
|
||||
TryInitializeHeader();
|
||||
}
|
||||
|
||||
Reader(const Reader &) = delete;
|
||||
Reader &operator=(const Reader &) = delete;
|
||||
|
||||
Reader(Reader &&) = delete;
|
||||
Reader &operator=(Reader &&) = delete;
|
||||
Reader(Reader &&) = default;
|
||||
Reader &operator=(Reader &&) = default;
|
||||
|
||||
~Reader() {
|
||||
if (csv_stream_.is_open()) csv_stream_.close();
|
||||
}
|
||||
~Reader() = default;
|
||||
|
||||
struct ParseError {
|
||||
enum class ErrorCode : uint8_t { BAD_HEADER, NO_CLOSING_QUOTE, UNEXPECTED_TOKEN, BAD_NUM_OF_COLUMNS, NULL_BYTE };
|
||||
@ -68,6 +70,8 @@ class Reader {
|
||||
};
|
||||
|
||||
using ParsingResult = utils::BasicResult<ParseError, Row>;
|
||||
[[nodiscard]] bool HasHeader() const;
|
||||
const std::optional<Header> &GetHeader() const;
|
||||
std::optional<Row> GetNextRow();
|
||||
|
||||
private:
|
||||
@ -76,20 +80,16 @@ class Reader {
|
||||
Config read_config_;
|
||||
uint64_t line_count_{1};
|
||||
uint16_t number_of_columns_{0};
|
||||
|
||||
struct Header {
|
||||
Header() = default;
|
||||
explicit Header(std::vector<std::string> cols) : columns(std::move(cols)) {}
|
||||
std::vector<std::string> columns;
|
||||
};
|
||||
|
||||
std::optional<Header> header_{};
|
||||
utils::MemoryResource *memory_;
|
||||
|
||||
void InitializeStream();
|
||||
|
||||
std::optional<std::string> GetNextLine();
|
||||
void TryInitializeHeader();
|
||||
|
||||
std::optional<Header> ParseHeader();
|
||||
std::optional<utils::pmr::string> GetNextLine();
|
||||
|
||||
ParsingResult ParseHeader();
|
||||
|
||||
ParsingResult ParseRow();
|
||||
};
|
||||
|
@ -226,8 +226,8 @@ target_link_libraries(${test_prefix}utils_file_locker mg-utils fmt)
|
||||
add_unit_test(utils_thread_pool.cpp)
|
||||
target_link_libraries(${test_prefix}utils_thread_pool mg-utils fmt)
|
||||
|
||||
add_unit_test(csv_parsing.cpp ${CMAKE_SOURCE_DIR}/src/utils/csv_parsing.cpp)
|
||||
target_link_libraries(${test_prefix}csv_parsing mg-utils fmt)
|
||||
add_unit_test(utils_csv_parsing.cpp ${CMAKE_SOURCE_DIR}/src/utils/csv_parsing.cpp)
|
||||
target_link_libraries(${test_prefix}utils_csv_parsing mg-utils fmt)
|
||||
|
||||
# Test mg-storage-v2
|
||||
|
||||
|
@ -1,194 +0,0 @@
|
||||
#include "utils/csv_parsing.hpp"
|
||||
#include "gmock/gmock.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "utils/string.hpp"
|
||||
|
||||
|
||||
class CsvReaderTest : public ::testing::Test {
|
||||
protected:
|
||||
const std::filesystem::path csv_directory{std::filesystem::temp_directory_path() / "csv_testing"};
|
||||
|
||||
void SetUp() override { Clear(); CreateCsvDir(); }
|
||||
|
||||
void TearDown() override { Clear(); }
|
||||
|
||||
private:
|
||||
|
||||
void CreateCsvDir() {
|
||||
if (!std::filesystem::exists(csv_directory)) {
|
||||
std::filesystem::create_directory(csv_directory);
|
||||
}
|
||||
}
|
||||
void Clear() {
|
||||
if (!std::filesystem::exists(csv_directory)) return;
|
||||
std::filesystem::remove_all(csv_directory);
|
||||
}
|
||||
};
|
||||
|
||||
namespace {
|
||||
class FileWriter {
|
||||
public:
|
||||
explicit FileWriter(const std::filesystem::path path) { stream_.open(path); }
|
||||
|
||||
FileWriter(const FileWriter &) = delete;
|
||||
FileWriter &operator=(const FileWriter &) = delete;
|
||||
|
||||
FileWriter(FileWriter &&) = delete;
|
||||
FileWriter &operator=(FileWriter &&) = delete;
|
||||
|
||||
void Close() { stream_.close(); }
|
||||
|
||||
size_t WriteLine(const std::string_view line) {
|
||||
if (!stream_.is_open()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
stream_ << line << std::endl;
|
||||
|
||||
// including the newline character
|
||||
return line.size() + 1;
|
||||
}
|
||||
|
||||
private:
|
||||
std::ofstream stream_;
|
||||
};
|
||||
|
||||
std::string CreateRow(const std::vector<std::string> &columns, const std::string_view delim) {
|
||||
return utils::Join(columns, delim);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST_F(CsvReaderTest, CommaDelimiter) {
|
||||
// create a file with a valid and an invalid row;
|
||||
// the invalid row has wrong delimiters;
|
||||
// expect the parser's output to be a single string for the invalid row;
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
const std::vector<std::string> columns1{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns1, ","));
|
||||
|
||||
const std::vector<std::string> columns2{"D", "E", "F"};
|
||||
writer.WriteLine(CreateRow(columns2, ";"));
|
||||
|
||||
writer.Close();
|
||||
|
||||
// note - default delimiter is ","
|
||||
auto reader = csv::Reader(filepath);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row->columns, columns1);
|
||||
|
||||
EXPECT_THROW(reader.GetNextRow(), csv::CsvReadException);
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, SemicolonDelimiter) {
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
const std::string delimiter = ";";
|
||||
const std::vector<std::string> columns1{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns1, delimiter));
|
||||
|
||||
const std::vector<std::string> columns2{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns2, ","));
|
||||
|
||||
writer.Close();
|
||||
|
||||
const csv::Reader::Config cfg(delimiter, "\"", false, false);
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row->columns, columns1);
|
||||
|
||||
EXPECT_THROW(reader.GetNextRow(), csv::CsvReadException);
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, SkipBad) {
|
||||
// create a file with invalid first two rows (containing a string with a
|
||||
// missing closing quote);
|
||||
// the last row is valid;
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
const std::string delimiter = ",";
|
||||
|
||||
const std::vector<std::string> columns_bad{"A", "B", "\"C"};
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
|
||||
const std::vector<std::string> columns_good{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns_good, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
{
|
||||
// we set the 'skip_bad' flag in the read configuration to 'true';
|
||||
// parser's output should be solely the valid row;
|
||||
const bool skip_bad = true;
|
||||
const csv::Reader::Config cfg(delimiter, "\"", false, skip_bad);
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row->columns, columns_good);
|
||||
}
|
||||
|
||||
{
|
||||
// we set the 'skip_bad' flag in the read configuration to 'false';
|
||||
// an exception must be thrown;
|
||||
const bool skip_bad = false;
|
||||
const csv::Reader::Config cfg(delimiter, "\"", false, skip_bad);
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
EXPECT_THROW(reader.GetNextRow(), csv::CsvReadException);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, AllRowsValid) {
|
||||
// create a file with all rows valid;
|
||||
// parser should return 'std::nullopt'
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
const std::string delimiter = ",";
|
||||
|
||||
const std::vector<std::string> columns{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
const bool skip_bad = false;
|
||||
const csv::Reader::Config cfg(delimiter, "\"", false, skip_bad);
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
while (auto parsed_row = reader.GetNextRow()) {
|
||||
ASSERT_EQ(parsed_row->columns, columns);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, SkipAllRows) {
|
||||
// create a file with all rows invalid (containing a string with a missing closing quote);
|
||||
// parser should return 'std::nullopt'
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
const std::string delimiter = ",";
|
||||
|
||||
const std::vector<std::string> columns_bad{"A", "B", "\"C"};
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
const bool skip_bad = true;
|
||||
const csv::Reader::Config cfg(delimiter, "\"", false, skip_bad);
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row, std::nullopt);
|
||||
}
|
@ -2869,4 +2869,70 @@ TEST_P(CypherMainVisitorTest, TestLockPathQuery) {
|
||||
test_lock_path_query("UNLOCK", LockPathQuery::Action::UNLOCK_PATH);
|
||||
}
|
||||
|
||||
TEST_P(CypherMainVisitorTest, TestLoadCsvClause) {
|
||||
auto &ast_generator = *GetParam();
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv")";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH)";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH HEADER)";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH HEADER DELIMITER ";")";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH HEADER DELIMITER ";" QUOTE "'")";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH HEADER DELIMITER ";" QUOTE "'" AS)";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM file WITH HEADER IGNORE BAD DELIMITER ";" QUOTE "'" AS x)";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SyntaxException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH HEADER IGNORE BAD DELIMITER 0 QUOTE "'" AS x)";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SemanticException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH HEADER IGNORE BAD DELIMITER ";" QUOTE 0 AS x)";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SemanticException);
|
||||
}
|
||||
|
||||
{
|
||||
// can't be a standalone clause
|
||||
const std::string query = R"(LOAD CSV FROM "file.csv" WITH HEADER IGNORE BAD DELIMITER ";" QUOTE "'" AS x)";
|
||||
ASSERT_THROW(ast_generator.ParseQuery(query), SemanticException);
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query =
|
||||
R"(LOAD CSV FROM "file.csv" WITH HEADER IGNORE BAD DELIMITER ";" QUOTE "'" AS x RETURN x)";
|
||||
auto *parsed_query = dynamic_cast<CypherQuery *>(ast_generator.ParseQuery(query));
|
||||
ASSERT_TRUE(parsed_query);
|
||||
auto *load_csv_clause = dynamic_cast<LoadCsv *>(parsed_query->single_query_->clauses_[0]);
|
||||
ASSERT_TRUE(load_csv_clause);
|
||||
ASSERT_TRUE(load_csv_clause->with_header_);
|
||||
ASSERT_TRUE(load_csv_clause->ignore_bad_);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "query/typed_value.hpp"
|
||||
#include "query_common.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "utils/csv_parsing.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
namespace {
|
||||
@ -193,6 +194,11 @@ TEST_F(InterpreterTest, Parameters) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(InterpreterTest, LoadCsv) {
|
||||
// for debug purposes
|
||||
auto [stream, qid] = Prepare(R"(LOAD CSV FROM "simple.csv" NO HEADER AS row RETURN row)");
|
||||
}
|
||||
|
||||
// Test bfs end to end.
|
||||
TEST_F(InterpreterTest, Bfs) {
|
||||
srand(0);
|
||||
@ -776,3 +782,116 @@ TEST_F(InterpreterTest, Qid) {
|
||||
interpreter_.CommitTransaction();
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
// copied from utils_csv_parsing.cpp - tmp dir management and csv file writer
|
||||
class TmpCsvDirManager final {
|
||||
public:
|
||||
TmpCsvDirManager() { CreateCsvDir(); }
|
||||
~TmpCsvDirManager() { Clear(); }
|
||||
|
||||
const std::filesystem::path &Path() const { return tmp_dir_; }
|
||||
|
||||
private:
|
||||
const std::filesystem::path tmp_dir_{std::filesystem::temp_directory_path() / "csv_directory"};
|
||||
|
||||
void CreateCsvDir() {
|
||||
if (!std::filesystem::exists(tmp_dir_)) {
|
||||
std::filesystem::create_directory(tmp_dir_);
|
||||
}
|
||||
}
|
||||
|
||||
void Clear() {
|
||||
if (!std::filesystem::exists(tmp_dir_)) return;
|
||||
std::filesystem::remove_all(tmp_dir_);
|
||||
}
|
||||
};
|
||||
|
||||
class FileWriter {
|
||||
public:
|
||||
explicit FileWriter(const std::filesystem::path path) { stream_.open(path); }
|
||||
|
||||
FileWriter(const FileWriter &) = delete;
|
||||
FileWriter &operator=(const FileWriter &) = delete;
|
||||
|
||||
FileWriter(FileWriter &&) = delete;
|
||||
FileWriter &operator=(FileWriter &&) = delete;
|
||||
|
||||
void Close() { stream_.close(); }
|
||||
|
||||
size_t WriteLine(const std::string_view line) {
|
||||
if (!stream_.is_open()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
stream_ << line << std::endl;
|
||||
|
||||
// including the newline character
|
||||
return line.size() + 1;
|
||||
}
|
||||
|
||||
private:
|
||||
std::ofstream stream_;
|
||||
};
|
||||
|
||||
std::string CreateRow(const std::vector<std::string> &columns, const std::string_view delim) {
|
||||
return utils::Join(columns, delim);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
TEST_F(InterpreterTest, LoadCsvClause) {
|
||||
auto dir_manager = TmpCsvDirManager();
|
||||
const auto csv_path = dir_manager.Path() / "file.csv";
|
||||
auto writer = FileWriter(csv_path);
|
||||
|
||||
const std::string delimiter{"|"};
|
||||
|
||||
const std::vector<std::string> header{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(header, delimiter));
|
||||
|
||||
const std::vector<std::string> good_columns_1{"a", "b", "c"};
|
||||
writer.WriteLine(CreateRow(good_columns_1, delimiter));
|
||||
|
||||
const std::vector<std::string> bad_columns{"\"\"1", "2", "3"};
|
||||
writer.WriteLine(CreateRow(bad_columns, delimiter));
|
||||
|
||||
const std::vector<std::string> good_columns_2{"d", "e", "f"};
|
||||
writer.WriteLine(CreateRow(good_columns_2, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
{
|
||||
const std::string query = fmt::format(R"(LOAD CSV FROM "{}" WITH HEADER IGNORE BAD DELIMITER "{}" AS x RETURN x.A)",
|
||||
csv_path.string(), delimiter);
|
||||
auto [stream, qid] = Prepare(query);
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "x.A");
|
||||
|
||||
Pull(&stream, 1);
|
||||
ASSERT_EQ(stream.GetSummary().count("has_more"), 1);
|
||||
ASSERT_TRUE(stream.GetSummary().at("has_more").ValueBool());
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].ValueString(), "a");
|
||||
|
||||
Pull(&stream, 1);
|
||||
ASSERT_EQ(stream.GetSummary().count("has_more"), 1);
|
||||
ASSERT_FALSE(stream.GetSummary().at("has_more").ValueBool());
|
||||
ASSERT_EQ(stream.GetResults().size(), 2U);
|
||||
ASSERT_EQ(stream.GetResults()[1][0].ValueString(), "d");
|
||||
}
|
||||
|
||||
{
|
||||
const std::string query = fmt::format(R"(LOAD CSV FROM "{}" WITH HEADER IGNORE BAD DELIMITER "{}" AS x RETURN x.C)",
|
||||
csv_path.string(), delimiter);
|
||||
auto [stream, qid] = Prepare(query);
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "x.C");
|
||||
|
||||
Pull(&stream);
|
||||
ASSERT_EQ(stream.GetSummary().count("has_more"), 1);
|
||||
ASSERT_FALSE(stream.GetSummary().at("has_more").ValueBool());
|
||||
ASSERT_EQ(stream.GetResults().size(), 2U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].ValueString(), "c");
|
||||
ASSERT_EQ(stream.GetResults()[1][0].ValueString(), "f");
|
||||
}
|
||||
}
|
||||
|
250
tests/unit/utils_csv_parsing.cpp
Normal file
250
tests/unit/utils_csv_parsing.cpp
Normal file
@ -0,0 +1,250 @@
|
||||
#include "gmock/gmock.h"
|
||||
#include "gtest/gtest.h"
|
||||
#include "utils/csv_parsing.hpp"
|
||||
|
||||
#include "utils/string.hpp"
|
||||
|
||||
class CsvReaderTest : public ::testing::Test {
|
||||
protected:
|
||||
const std::filesystem::path csv_directory{std::filesystem::temp_directory_path() / "csv_testing"};
|
||||
|
||||
void SetUp() override {
|
||||
Clear();
|
||||
CreateCsvDir();
|
||||
}
|
||||
|
||||
void TearDown() override { Clear(); }
|
||||
|
||||
private:
|
||||
void CreateCsvDir() {
|
||||
if (!std::filesystem::exists(csv_directory)) {
|
||||
std::filesystem::create_directory(csv_directory);
|
||||
}
|
||||
}
|
||||
void Clear() {
|
||||
if (!std::filesystem::exists(csv_directory)) return;
|
||||
std::filesystem::remove_all(csv_directory);
|
||||
}
|
||||
};
|
||||
|
||||
namespace {
|
||||
class FileWriter {
|
||||
public:
|
||||
explicit FileWriter(const std::filesystem::path path) { stream_.open(path); }
|
||||
|
||||
FileWriter(const FileWriter &) = delete;
|
||||
FileWriter &operator=(const FileWriter &) = delete;
|
||||
|
||||
FileWriter(FileWriter &&) = delete;
|
||||
FileWriter &operator=(FileWriter &&) = delete;
|
||||
|
||||
void Close() { stream_.close(); }
|
||||
|
||||
size_t WriteLine(const std::string_view line) {
|
||||
if (!stream_.is_open()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
stream_ << line << std::endl;
|
||||
|
||||
// including the newline character
|
||||
return line.size() + 1;
|
||||
}
|
||||
|
||||
private:
|
||||
std::ofstream stream_;
|
||||
};
|
||||
|
||||
std::string CreateRow(const std::vector<std::string> &columns, const std::string_view delim) {
|
||||
return utils::Join(columns, delim);
|
||||
}
|
||||
|
||||
auto ToPmrColumns(const std::vector<std::string> &columns) {
|
||||
utils::pmr::vector<utils::pmr::string> pmr_columns(utils::NewDeleteResource());
|
||||
for (const auto &col : columns) {
|
||||
pmr_columns.emplace_back(col);
|
||||
}
|
||||
return pmr_columns;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST_F(CsvReaderTest, CommaDelimiter) {
|
||||
// create a file with a single valid row;
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
const std::vector<std::string> columns{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns, ","));
|
||||
|
||||
writer.Close();
|
||||
|
||||
utils::MemoryResource *mem{utils::NewDeleteResource()};
|
||||
|
||||
bool with_header = false;
|
||||
bool ignore_bad = false;
|
||||
utils::pmr::string delimiter{",", mem};
|
||||
utils::pmr::string quote{"\"", mem};
|
||||
|
||||
csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
|
||||
auto reader = csv::Reader(filepath, cfg, mem);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row, ToPmrColumns(columns));
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, SemicolonDelimiter) {
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
utils::MemoryResource *mem(utils::NewDeleteResource());
|
||||
|
||||
const utils::pmr::string delimiter{";", mem};
|
||||
const utils::pmr::string quote{"\"", mem};
|
||||
|
||||
const std::vector<std::string> columns{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
const bool with_header = false;
|
||||
const bool ignore_bad = false;
|
||||
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
|
||||
auto reader = csv::Reader(filepath, cfg, mem);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row, ToPmrColumns(columns));
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, SkipBad) {
|
||||
// create a file with invalid first two rows (containing a string with a
|
||||
// missing closing quote);
|
||||
// the last row is valid;
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
utils::MemoryResource *mem(utils::NewDeleteResource());
|
||||
|
||||
const utils::pmr::string delimiter{";", mem};
|
||||
const utils::pmr::string quote{"\"", mem};
|
||||
|
||||
const std::vector<std::string> columns_bad{"A", "B", "\"\"C"};
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
|
||||
const std::vector<std::string> columns_good{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns_good, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
{
|
||||
// we set the 'ignore_bad' flag in the read configuration to 'true';
|
||||
// parser's output should be solely the valid row;
|
||||
const bool with_header = false;
|
||||
const bool ignore_bad = true;
|
||||
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
|
||||
auto reader = csv::Reader(filepath, cfg, mem);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row, ToPmrColumns(columns_good));
|
||||
}
|
||||
|
||||
{
|
||||
// we set the 'ignore_bad' flag in the read configuration to 'false';
|
||||
// an exception must be thrown;
|
||||
const bool with_header = false;
|
||||
const bool ignore_bad = false;
|
||||
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
|
||||
auto reader = csv::Reader(filepath, cfg, mem);
|
||||
|
||||
EXPECT_THROW(reader.GetNextRow(), csv::CsvReadException);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, AllRowsValid) {
|
||||
// create a file with all rows valid;
|
||||
// parser should return 'std::nullopt'
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
utils::MemoryResource *mem(utils::NewDeleteResource());
|
||||
|
||||
const utils::pmr::string delimiter{",", mem};
|
||||
const utils::pmr::string quote{"\"", mem};
|
||||
|
||||
std::vector<std::string> columns{"A", "B", "C"};
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
const bool with_header = false;
|
||||
const bool ignore_bad = false;
|
||||
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
const auto pmr_columns = ToPmrColumns(columns);
|
||||
while (auto parsed_row = reader.GetNextRow()) {
|
||||
ASSERT_EQ(parsed_row, pmr_columns);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, SkipAllRows) {
|
||||
// create a file with all rows invalid (containing a string with a missing closing quote);
|
||||
// parser should return 'std::nullopt'
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
utils::MemoryResource *mem(utils::NewDeleteResource());
|
||||
|
||||
const utils::pmr::string delimiter{",", mem};
|
||||
const utils::pmr::string quote{"\"", mem};
|
||||
|
||||
const std::vector<std::string> columns_bad{"A", "B", "\"\"C"};
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
writer.WriteLine(CreateRow(columns_bad, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
const bool with_header = false;
|
||||
const bool ignore_bad = true;
|
||||
const csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
auto parsed_row = reader.GetNextRow();
|
||||
ASSERT_EQ(parsed_row, std::nullopt);
|
||||
}
|
||||
|
||||
TEST_F(CsvReaderTest, WithHeader) {
|
||||
const auto filepath = csv_directory / "bla.csv";
|
||||
auto writer = FileWriter(filepath);
|
||||
|
||||
utils::MemoryResource *mem(utils::NewDeleteResource());
|
||||
|
||||
const utils::pmr::string delimiter{",", mem};
|
||||
const utils::pmr::string quote{"\"", mem};
|
||||
|
||||
const std::vector<std::string> header{"A", "B", "C"};
|
||||
const std::vector<std::string> columns{"1", "2", "3"};
|
||||
writer.WriteLine(CreateRow(header, delimiter));
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
writer.WriteLine(CreateRow(columns, delimiter));
|
||||
|
||||
writer.Close();
|
||||
|
||||
const bool with_header = true;
|
||||
const bool ignore_bad = false;
|
||||
const csv::Reader::Config cfg(with_header, ignore_bad, delimiter, quote);
|
||||
auto reader = csv::Reader(filepath, cfg);
|
||||
|
||||
const auto pmr_header = ToPmrColumns(header);
|
||||
ASSERT_EQ(reader.GetHeader(), pmr_header);
|
||||
|
||||
const auto pmr_columns = ToPmrColumns(columns);
|
||||
while (auto parsed_row = reader.GetNextRow()) {
|
||||
ASSERT_EQ(parsed_row, pmr_columns);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user