Fix MATCH + LOAD CSV to load CSV only once (#916)

* update profile query to use poolresource
* Optimize update of indexes
* Add ignore empty strings to load csv
* Add operator changes to support handling of nulls
* Store chunks in memory pools ordered
* Use same max block per chunks number
* Remove redundant return statement
* add hacky cached solution
* change map to set
* remove memory
* Add match load csv invalid behaviour commit
* Accept input on LOAD CSV
* Ommit changes not tied to the PR
* Add tests for match + load csv
* Add gqlalchemy installation for e2e tests
* Modify setup script to update packages
* Revert gqlalchemy to 1.3.3
* Revert gqlalchemy to 1.3.3
* Address PR review comments
* Ommit semicolon
---------

Co-authored-by: antoniofilipovic <filipovicantonio1998@gmail.com>
Co-authored-by: János Benjamin Antal <benjamin.antal@memgraph.io>
This commit is contained in:
Josipmrden 2023-06-21 11:13:40 +02:00 committed by GitHub
parent df95775222
commit 63f8298033
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 132 additions and 20 deletions

View File

@ -4648,7 +4648,7 @@ LoadCsv::LoadCsv(std::shared_ptr<LogicalOperator> input, Expression *file, bool
MG_ASSERT(file_, "Something went wrong - '{}' member file_ shouldn't be a nullptr", __func__);
}
bool LoadCsv::Accept(HierarchicalLogicalOperatorVisitor &visitor) { return false; };
ACCEPT_WITH_INPUT(LoadCsv)
class LoadCsvCursor;
@ -4699,14 +4699,12 @@ TypedValue CsvRowToTypedMap(csv::Reader::Row &row, csv::Reader::Header header) {
class LoadCsvCursor : public Cursor {
const LoadCsv *self_;
const UniqueCursorPtr input_cursor_;
bool input_is_once_;
bool did_pull_;
std::optional<csv::Reader> reader_{};
public:
LoadCsvCursor(const LoadCsv *self, utils::MemoryResource *mem)
: self_(self), input_cursor_(self_->input_->MakeCursor(mem)) {
input_is_once_ = dynamic_cast<Once *>(self_->input_.get());
}
: self_(self), input_cursor_(self_->input_->MakeCursor(mem)), did_pull_{false} {}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("LoadCsv");
@ -4722,14 +4720,14 @@ class LoadCsvCursor : public Cursor {
reader_ = MakeReader(&context.evaluation_context);
}
bool input_pulled = input_cursor_->Pull(frame, context);
if (input_cursor_->Pull(frame, context)) {
if (did_pull_) {
throw QueryRuntimeException(
"LOAD CSV can be executed only once, please check if the cardinality of the operator before LOAD CSV is 1");
}
did_pull_ = true;
}
// If the input is Once, we have to keep going until we read all the rows,
// regardless of whether the pull on Once returned false.
// If we have e.g. MATCH(n) LOAD CSV ... AS x SET n.name = x.name, then we
// have to read at most cardinality(n) rows (but we can read less and stop
// pulling MATCH).
if (!input_is_once_ && !input_pulled) return false;
auto row = reader_->GetNextRow(context.evaluation_context.memory);
if (!row) {
return false;

View File

@ -874,11 +874,27 @@ bool PlanToJsonVisitor::PreVisit(query::plan::CallProcedure &op) {
bool PlanToJsonVisitor::PreVisit(query::plan::LoadCsv &op) {
json self;
self["name"] = "LoadCsv";
self["file"] = ToJson(op.file_);
self["with_header"] = op.with_header_;
self["ignore_bad"] = op.ignore_bad_;
self["delimiter"] = ToJson(op.delimiter_);
self["quote"] = ToJson(op.quote_);
if (op.file_) {
self["file"] = ToJson(op.file_);
}
if (op.with_header_) {
self["with_header"] = op.with_header_;
}
if (op.ignore_bad_) {
self["ignore_bad"] = op.ignore_bad_;
}
if (op.delimiter_) {
self["delimiter"] = ToJson(op.delimiter_);
}
if (op.quote_) {
self["quote"] = ToJson(op.quote_);
}
self["row_variable"] = ToJson(op.row_var_);
op.input_->Accept(*this);

View File

@ -477,6 +477,16 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor {
return true;
}
bool PreVisit(LoadCsv &op) override {
prev_ops_.push_back(&op);
return true;
}
bool PostVisit(LoadCsv & /*op*/) override {
prev_ops_.pop_back();
return true;
}
std::shared_ptr<LogicalOperator> new_root_;
private:

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -55,6 +55,7 @@ add_subdirectory(python_query_modules_reloading)
add_subdirectory(analyze_graph)
add_subdirectory(transaction_queue)
add_subdirectory(mock_api)
add_subdirectory(load_csv)
add_subdirectory(init_file_flags)
copy_e2e_python_files(pytest_runner pytest_runner.sh "")

View File

@ -0,0 +1,10 @@
function(copy_load_csv_e2e_python_files FILE_NAME)
copy_e2e_python_files(load_csv ${FILE_NAME})
endfunction()
function(copy_load_csv_e2e_files FILE_NAME)
copy_e2e_python_files(load_csv ${FILE_NAME})
endfunction()
copy_load_csv_e2e_python_files(load_csv.py)
copy_load_csv_e2e_files(simple.csv)

View File

@ -0,0 +1,56 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import sys
from pathlib import Path
import pytest
from gqlalchemy import Memgraph
from mgclient import DatabaseError
SIMPLE_CSV_FILE = "simple.csv"
def get_file_path(file: str) -> str:
return os.path.join(Path(__file__).parent.absolute(), file)
def test_given_two_rows_in_db_when_load_csv_after_match_then_throw_exception():
memgraph = Memgraph("localhost", 7687)
with pytest.raises(DatabaseError):
next(
memgraph.execute_and_fetch(
f"""MATCH (n) LOAD CSV
FROM '{get_file_path(SIMPLE_CSV_FILE)}' WITH HEADER AS row
CREATE (:Person {{name: row.name}})
"""
)
)
def test_given_one_row_in_db_when_load_csv_after_match_then_pass():
memgraph = Memgraph("localhost", 7687)
results = memgraph.execute_and_fetch(
f"""MATCH (n {{prop: 1}}) LOAD CSV
FROM '{get_file_path(SIMPLE_CSV_FILE)}' WITH HEADER AS row
CREATE (:Person {{name: row.name}})
RETURN n
"""
)
assert len(list(results)) == 4
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,5 @@
id,name
1,Joseph
2,Peter
3,Ella
4,Joe
1 id name
2 1 Joseph
3 2 Peter
4 3 Ella
5 4 Joe

View File

@ -0,0 +1,15 @@
load_csv_cluster: &load_csv_cluster
cluster:
main:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "load_csv_log_file.txt"
setup_queries:
- "CREATE (n {prop: 1});"
- "CREATE (n {prop: 2});"
validation_queries: []
workloads:
- name: "MATCH + LOAD CSV"
binary: "tests/e2e/pytest_runner.sh"
args: ["load_csv/load_csv.py"]
<<: *load_csv_cluster

View File

@ -1,6 +1,7 @@
#!/bin/bash
# shellcheck disable=1091
set -Eeuo pipefail
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source