Add HTTP+GZIP support to LOAD CSV (#1027)

This commit is contained in:
Gareth Andrew Lloyd 2023-06-26 18:10:48 +01:00 committed by GitHub
parent d573eda8bb
commit 3b781bf525
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 490 additions and 116 deletions

View File

@ -1,6 +1,7 @@
# MemGraph CMake configuration
cmake_minimum_required(VERSION 3.8)
cmake_minimum_required(VERSION 3.12)
cmake_policy(SET CMP0076 NEW)
# !! IMPORTANT !! run ./project_root/init.sh before cmake command
# to download dependencies
@ -18,10 +19,12 @@ set_directory_properties(PROPERTIES CLEAN_NO_CUSTOM TRUE)
# during the code coverage process
find_program(CCACHE_FOUND ccache)
option(USE_CCACHE "ccache:" ON)
message(STATUS "CCache: ${USE_CCACHE}")
if(CCACHE_FOUND AND USE_CCACHE)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache)
message(STATUS "CCache: Used")
else ()
message(STATUS "CCache: Not used")
endif(CCACHE_FOUND AND USE_CCACHE)
# choose a compiler
@ -37,7 +40,14 @@ endif()
# -----------------------------------------------------------------------------
project(memgraph)
project(memgraph LANGUAGES C CXX)
#TODO: upgrade to cmake 3.24 + CheckIPOSupported
#cmake_policy(SET CMP0138 NEW)
#include(CheckIPOSupported)
#check_ipo_supported()
#set(CMAKE_INTERPROCEDURAL_OPTIMIZATION_Release TRUE)
#set(CMAKE_INTERPROCEDURAL_OPTIMIZATION_RelWithDebInfo TRUE)
# Install licenses.
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/licenses/
@ -160,7 +170,7 @@ endif()
# setup CMake module path, defines path for include() and find_package()
# https://cmake.org/cmake/help/latest/variable/CMAKE_MODULE_PATH.html
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${PROJECT_SOURCE_DIR}/cmake)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
# custom function definitions
include(functions)
# -----------------------------------------------------------------------------

View File

@ -29,6 +29,7 @@ function(import_header_library name include_dir)
set(${_upper_name}_INCLUDE_DIR ${include_dir} CACHE FILEPATH
"Path to ${name} include directory" FORCE)
mark_as_advanced(${_upper_name}_INCLUDE_DIR)
add_library(lib::${name} ALIAS ${name})
endfunction(import_header_library)
function(import_library name type location include_dir)
@ -257,3 +258,6 @@ import_external_library(librdtsc STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdtsc/include
CMAKE_ARGS ${MG_LIBRDTSC_CMAKE_ARGS}
BUILD_COMMAND $(MAKE) rdtsc)
# setup ctre
import_header_library(ctre ${CMAKE_CURRENT_SOURCE_DIR})

View File

@ -122,6 +122,7 @@ declare -A primary_urls=(
["protobuf"]="http://$local_cache_host/git/protobuf.git"
["pulsar"]="http://$local_cache_host/git/pulsar.git"
["librdtsc"]="http://$local_cache_host/git/librdtsc.git"
["ctre"]="http://$local_cache_host/file/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
)
# The goal of secondary urls is to have links to the "source of truth" of
@ -147,6 +148,7 @@ declare -A secondary_urls=(
["protobuf"]="https://github.com/protocolbuffers/protobuf.git"
["pulsar"]="https://github.com/apache/pulsar.git"
["librdtsc"]="https://github.com/gabrieleara/librdtsc.git"
["ctre"]="https://raw.githubusercontent.com/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
)
# antlr
@ -238,3 +240,9 @@ repo_clone_try_double "${primary_urls[librdtsc]}" "${secondary_urls[librdtsc]}"
pushd librdtsc
git apply ../librdtsc.patch
popd
#ctre
mkdir -p ctre
cd ctre
file_get_try_double "${primary_urls[ctre]}" "${secondary_urls[ctre]}"
cd ..

View File

@ -1,6 +1,7 @@
# CMake configuration for the main memgraph library and executable
# add memgraph sub libraries, ordered by dependency
add_subdirectory(csv)
add_subdirectory(utils)
add_subdirectory(requests)
add_subdirectory(io)

18
src/csv/CMakeLists.txt Normal file
View File

@ -0,0 +1,18 @@
add_library(mg-csv STATIC)
add_library(mg::csv ALIAS mg-csv)
target_sources(mg-csv
PUBLIC
include/csv/parsing.hpp
PRIVATE
parsing.cpp
)
target_include_directories(mg-csv PUBLIC include)
find_package(Boost REQUIRED COMPONENTS iostreams)
target_link_libraries(mg-csv
PUBLIC mg-utils
PRIVATE lib::ctre mg-requests Boost::iostreams
)
add_subdirectory(fuzz)

View File

@ -0,0 +1,5 @@
add_executable(fuzz_csv EXCLUDE_FROM_ALL)
target_sources(fuzz_csv PRIVATE fuzz_reader.cpp)
target_link_libraries(fuzz_csv PRIVATE mg::csv)
target_compile_options(fuzz_csv PRIVATE -fsanitize=fuzzer)
target_link_libraries(fuzz_csv PRIVATE -fsanitize=fuzzer)

View File

@ -0,0 +1,78 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <cstdint>
#include <optional>
#include <sstream>
#include "csv/parsing.hpp"
#include "utils/string.hpp"
namespace mg = memgraph;
namespace csv = mg::csv;
using pmr_str = mg::utils::pmr::string;
extern "C" int LLVMFuzzerTestOneInput(std::uint8_t const *data, std::size_t size) {
// need to parse a config
if (size < 4) return 0;
auto const with_header = bool(data[0]);
auto const ignore_bad = bool(data[1]);
auto const delim_size = data[2];
auto const quote_size = data[3];
// 0 will be nullopt, everything else will be one smaller
// 0 , 1, 2, 3, 4
// nullopt, 0, 1, 2, 3
auto const delim_real_size = delim_size == 0 ? 0 : delim_size - 1;
auto const quote_real_size = quote_size == 0 ? 0 : quote_size - 1;
// not worth testing if too large
if (delim_real_size > 3 || quote_real_size > 3) return 0;
// is there enough space for them to exist
if (size < 4 + delim_real_size + quote_real_size) return 0;
auto const delim_start = 4;
auto const delim_end = delim_start + delim_real_size;
auto const quote_start = delim_end;
auto const quote_end = quote_start + quote_real_size;
auto *mem = mg::utils::NewDeleteResource();
auto delim = delim_size == 0 ? std::optional<pmr_str>{} : pmr_str(&data[delim_start], &data[delim_end], mem);
auto quote = quote_size == 0 ? std::optional<pmr_str>{} : pmr_str(&data[quote_start], &data[quote_end], mem);
auto const remaining = static_cast<int64_t>(size) - quote_end;
if (remaining < 0) __builtin_trap(); // if this hits, above parsing is wrong
// #############################################################################################################
// build Config
auto cfg = csv::Reader::Config{with_header, ignore_bad, std::move(delim), std::move(quote)};
// build CSV source
auto ss = std::stringstream{};
ss.write(reinterpret_cast<char const *>(&data[quote_end]), static_cast<std::streamsize>(remaining));
auto source = csv::StreamCsvSource{std::move(ss)};
// #############################################################################################################
try {
auto reader = memgraph::csv::Reader(std::move(source), std::move(cfg));
auto const header = reader.GetHeader();
asm volatile("" : : "g"(header) : "memory");
while (true) {
auto row = reader.GetNextRow(mem);
if (!row) break;
asm volatile("" : : "g"(row) : "memory");
}
} catch (csv::CsvReadException const &) {
// CsvReadException is ok
}
return 0;
}

View File

@ -23,6 +23,7 @@
#include <fstream>
#include <optional>
#include <string>
#include <variant>
#include <vector>
#include "utils/exceptions.hpp"
@ -36,13 +37,53 @@ class CsvReadException : public utils::BasicException {
using utils::BasicException::BasicException;
};
class FileCsvSource {
public:
explicit FileCsvSource(std::filesystem::path path);
std::istream &GetStream();
private:
std::filesystem::path path_;
std::ifstream stream_;
};
class StreamCsvSource {
public:
StreamCsvSource(std::stringstream stream) : stream_{std::move(stream)} {}
std::istream &GetStream() { return stream_; }
private:
std::stringstream stream_;
};
class UrlCsvSource : public StreamCsvSource {
public:
UrlCsvSource(char const *url);
};
class CsvSource {
public:
static auto Create(utils::pmr::string const &csv_location) -> CsvSource;
CsvSource(FileCsvSource source) : source_{std::move(source)} {}
CsvSource(StreamCsvSource source) : source_{std::move(source)} {}
CsvSource(UrlCsvSource source) : source_{std::move(source)} {}
std::istream &GetStream();
private:
std::variant<FileCsvSource, UrlCsvSource, StreamCsvSource> source_;
};
class Reader {
public:
struct Config {
Config() = default;
Config(const bool with_header, const bool ignore_bad, std::optional<utils::pmr::string> delim,
std::optional<utils::pmr::string> qt)
: with_header(with_header), ignore_bad(ignore_bad), delimiter(std::move(delim)), quote(std::move(qt)) {}
: with_header(with_header), ignore_bad(ignore_bad), delimiter(std::move(delim)), quote(std::move(qt)) {
// delimiter + quote can not be empty
if (delimiter && delimiter->empty()) delimiter.reset();
if (quote && quote->empty()) quote.reset();
}
bool with_header{false};
bool ignore_bad{false};
@ -53,16 +94,7 @@ class Reader {
using Row = utils::pmr::vector<utils::pmr::string>;
using Header = utils::pmr::vector<utils::pmr::string>;
Reader() = default;
explicit Reader(std::filesystem::path path, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource())
: memory_(mem), path_(std::move(path)) {
read_config_.with_header = cfg.with_header;
read_config_.ignore_bad = cfg.ignore_bad;
read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_};
read_config_.quote = cfg.quote ? std::move(*cfg.quote) : utils::pmr::string{"\"", memory_};
InitializeStream();
TryInitializeHeader();
}
explicit Reader(CsvSource source, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource());
Reader(const Reader &) = delete;
Reader &operator=(const Reader &) = delete;
@ -81,28 +113,18 @@ class Reader {
};
using ParsingResult = utils::BasicResult<ParseError, Row>;
[[nodiscard]] bool HasHeader() const;
const Header &GetHeader() const;
std::optional<Row> GetNextRow(utils::MemoryResource *mem);
bool HasHeader() const;
auto GetHeader() const -> Header const &;
auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Row>;
private:
utils::MemoryResource *memory_;
std::filesystem::path path_;
std::ifstream csv_stream_;
Config read_config_;
uint64_t line_count_{1};
uint16_t number_of_columns_{0};
Header header_{memory_};
void InitializeStream();
void TryInitializeHeader();
std::optional<utils::pmr::string> GetNextLine(utils::MemoryResource *mem);
ParsingResult ParseHeader();
ParsingResult ParseRow(utils::MemoryResource *mem);
// Some implementation issues that need clearing up, but this is mainly because
// I don't want `boost/iostreams/filtering_stream.hpp` included in this header file
// Because it causes issues when combined with antlr headers
// When we have C++20 modules this can be fixed
struct impl;
std::unique_ptr<impl, void (*)(impl *)> pimpl;
};
} // namespace memgraph::csv

View File

@ -9,33 +9,132 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "utils/csv_parsing.hpp"
#include "csv/parsing.hpp"
#include <string_view>
#include <boost/iostreams/filter/bzip2.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#include <ctre/ctre.hpp>
#include "requests/requests.hpp"
#include "utils/file.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/string.hpp"
using PlainStream = boost::iostreams::filtering_istream;
namespace memgraph::csv {
using ParseError = Reader::ParseError;
void Reader::InitializeStream() {
if (!std::filesystem::exists(path_)) {
throw CsvReadException("CSV file not found: {}", path_.string());
}
csv_stream_.open(path_);
if (!csv_stream_.good()) {
throw CsvReadException("CSV file {} couldn't be opened!", path_.string());
}
struct Reader::impl {
impl(CsvSource source, Reader::Config cfg, utils::MemoryResource *mem);
[[nodiscard]] bool HasHeader() const { return read_config_.with_header; }
[[nodiscard]] auto Header() const -> Header const & { return header_; }
auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Reader::Row>;
private:
void InitializeStream();
void TryInitializeHeader();
std::optional<utils::pmr::string> GetNextLine(utils::MemoryResource *mem);
ParsingResult ParseHeader();
ParsingResult ParseRow(utils::MemoryResource *mem);
utils::MemoryResource *memory_;
std::filesystem::path path_;
CsvSource source_;
PlainStream csv_stream_;
Config read_config_;
uint64_t line_count_{1};
uint16_t number_of_columns_{0};
Reader::Header header_{memory_};
};
Reader::impl::impl(CsvSource source, Reader::Config cfg, utils::MemoryResource *mem)
: memory_(mem), source_(std::move(source)) {
read_config_.with_header = cfg.with_header;
read_config_.ignore_bad = cfg.ignore_bad;
read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_};
read_config_.quote = cfg.quote ? std::move(*cfg.quote) : utils::pmr::string{"\"", memory_};
InitializeStream();
TryInitializeHeader();
}
std::optional<utils::pmr::string> Reader::GetNextLine(utils::MemoryResource *mem) {
enum class CompressionMethod : uint8_t {
NONE,
GZip,
BZip2,
};
/// Detect compression based on magic sequences
auto DetectCompressionMethod(std::istream &is) -> CompressionMethod {
// Ensure stream is reset
auto const on_exit = utils::OnScopeExit{[&]() { is.seekg(std::ios::beg); }};
// Note we must use bytes for comparison, not char
//
std::byte c{}; // this gets reused
auto const next_byte = [&](std::byte &b) { return bool(is.get(reinterpret_cast<char &>(b))); };
if (!next_byte(c)) return CompressionMethod::NONE;
auto const as_bytes = []<typename... Args>(Args... args) {
return std::array<std::byte, sizeof...(Args)>{std::byte(args)...};
};
// Gzip - 0x1F8B
constexpr static auto gzip_seq = as_bytes(0x1F, 0x8B);
if (c == gzip_seq[0]) {
if (!next_byte(c) || c != gzip_seq[1]) return CompressionMethod::NONE;
return CompressionMethod::GZip;
}
// BZip2 - 0x425A68
constexpr static auto bzip_seq = as_bytes(0x42, 0x5A, 0x68);
if (c == bzip_seq[0]) {
if (!next_byte(c) || c != bzip_seq[1]) return CompressionMethod::NONE;
if (!next_byte(c) || c != bzip_seq[2]) return CompressionMethod::NONE;
return CompressionMethod::BZip2;
}
return CompressionMethod::NONE;
}
Reader::Reader(CsvSource source, Reader::Config cfg, utils::MemoryResource *mem)
: pimpl{new impl{std::move(source), std::move(cfg), mem}, [](impl *p) { delete p; }} {}
void Reader::impl::InitializeStream() {
auto &source = source_.GetStream();
auto const method = DetectCompressionMethod(source);
switch (method) {
case CompressionMethod::GZip:
csv_stream_.push(boost::iostreams::gzip_decompressor{});
break;
case CompressionMethod::BZip2:
csv_stream_.push(boost::iostreams::bzip2_decompressor{});
break;
default:
break;
}
csv_stream_.push(source);
MG_ASSERT(csv_stream_.auto_close(), "Should be 'auto close' for correct operation");
MG_ASSERT(csv_stream_.is_complete(), "Should be 'complete' for correct operation");
}
std::optional<utils::pmr::string> Reader::impl::GetNextLine(utils::MemoryResource *mem) {
utils::pmr::string line(mem);
if (!std::getline(csv_stream_, line)) {
// reached end of file or an I/0 error occurred
if (!csv_stream_.good()) {
csv_stream_.close();
csv_stream_.reset(); // this will close the file_stream_ and clear the chain
}
return std::nullopt;
}
@ -43,13 +142,13 @@ std::optional<utils::pmr::string> Reader::GetNextLine(utils::MemoryResource *mem
return std::move(line);
}
Reader::ParsingResult Reader::ParseHeader() {
Reader::ParsingResult Reader::impl::ParseHeader() {
// header must be the very first line in the file
MG_ASSERT(line_count_ == 1, "Invalid use of {}", __func__);
return ParseRow(memory_);
}
void Reader::TryInitializeHeader() {
void Reader::impl::TryInitializeHeader() {
if (!HasHeader()) {
return;
}
@ -67,16 +166,16 @@ void Reader::TryInitializeHeader() {
header_ = std::move(*header);
}
[[nodiscard]] bool Reader::HasHeader() const { return read_config_.with_header; }
[[nodiscard]] bool Reader::HasHeader() const { return pimpl->HasHeader(); }
const Reader::Header &Reader::GetHeader() const { return header_; }
const Reader::Header &Reader::GetHeader() const { return pimpl->Header(); }
namespace {
enum class CsvParserState : uint8_t { INITIAL_FIELD, NEXT_FIELD, QUOTING, EXPECT_DELIMITER, DONE };
} // namespace
Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) {
Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
utils::pmr::vector<utils::pmr::string> row(mem);
if (number_of_columns_ != 0) {
row.reserve(number_of_columns_);
@ -140,9 +239,10 @@ Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) {
break;
}
case CsvParserState::QUOTING: {
const auto quote_size = read_config_.quote->size();
const auto quote_now = utils::StartsWith(line_string_view, *read_config_.quote);
const auto quote_next =
utils::StartsWith(line_string_view.substr(read_config_.quote->size()), *read_config_.quote);
const auto quote_next = quote_size <= line_string_view.size() &&
utils::StartsWith(line_string_view.substr(quote_size), *read_config_.quote);
if (quote_now && quote_next) {
// This is an escaped quote character.
column += *read_config_.quote;
@ -216,12 +316,7 @@ Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) {
return std::move(row);
}
// Returns Reader::Row if the read row if valid;
// Returns std::nullopt if end of file is reached or an error occurred
// making it unreadable;
// @throws CsvReadException if a bad row is encountered, and the ignore_bad is set
// to 'true' in the Reader::Config.
std::optional<Reader::Row> Reader::GetNextRow(utils::MemoryResource *mem) {
std::optional<Reader::Row> Reader::impl::GetNextRow(utils::MemoryResource *mem) {
auto row = ParseRow(mem);
if (row.HasError()) {
@ -245,4 +340,44 @@ std::optional<Reader::Row> Reader::GetNextRow(utils::MemoryResource *mem) {
return std::move(*row);
}
// Returns Reader::Row if the read row if valid;
// Returns std::nullopt if end of file is reached or an error occurred
// making it unreadable;
// @throws CsvReadException if a bad row is encountered, and the ignore_bad is set
// to 'true' in the Reader::Config.
std::optional<Reader::Row> Reader::GetNextRow(utils::MemoryResource *mem) { return pimpl->GetNextRow(mem); }
FileCsvSource::FileCsvSource(std::filesystem::path path) : path_(std::move(path)) {
if (!std::filesystem::exists(path_)) {
throw CsvReadException("CSV file not found: {}", path_.string());
}
stream_.open(path_);
if (!stream_.good()) {
throw CsvReadException("CSV file {} couldn't be opened!", path_.string());
}
}
std::istream &FileCsvSource::GetStream() { return stream_; }
std::istream &CsvSource::GetStream() {
return *std::visit([](auto &&source) { return std::addressof(source.GetStream()); }, source_);
}
auto CsvSource::Create(const utils::pmr::string &csv_location) -> CsvSource {
constexpr auto protocol_matcher = ctre::starts_with<"(https?|ftp)://">;
if (protocol_matcher(csv_location)) {
return csv::UrlCsvSource{csv_location.c_str()};
}
return csv::FileCsvSource{csv_location};
}
// Helper for UrlCsvSource
auto urlToStringStream(const char *url) -> std::stringstream {
auto ss = std::stringstream{};
if (!requests::DownloadToStream(url, ss)) {
throw CsvReadException("CSV was unable to be fetched from {}", url);
}
return ss;
};
UrlCsvSource::UrlCsvSource(const char *url) : StreamCsvSource{urlToStringStream(url)} {}
} // namespace memgraph::csv

View File

@ -37,12 +37,9 @@ set(mg_query_sources
graph.cpp
db_accessor.cpp)
find_package(Boost REQUIRED)
add_library(mg-query STATIC ${mg_query_sources})
target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(mg-query dl cppitertools Boost::headers)
target_link_libraries(mg-query mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-license mg-utils mg-kvstore mg-memory)
target_link_libraries(mg-query PUBLIC dl cppitertools Python3::Python mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-license mg-utils mg-kvstore mg-memory mg::csv)
if(NOT "${MG_PYTHON_PATH}" STREQUAL "")
set(Python3_ROOT_DIR "${MG_PYTHON_PATH}")
endif()
@ -51,7 +48,6 @@ if("${MG_PYTHON_VERSION}" STREQUAL "")
else()
find_package(Python3 "${MG_PYTHON_VERSION}" EXACT REQUIRED COMPONENTS Development)
endif()
target_link_libraries(mg-query Python3::Python)
# Generate Antlr openCypher parser
@ -94,4 +90,4 @@ add_library(antlr_opencypher_parser_lib STATIC ${antlr_opencypher_generated_src}
add_dependencies(antlr_opencypher_parser_lib generate_opencypher_parser)
target_link_libraries(antlr_opencypher_parser_lib antlr4)
target_link_libraries(mg-query antlr_opencypher_parser_lib)
target_link_libraries(mg-query PUBLIC antlr_opencypher_parser_lib)

View File

@ -30,6 +30,7 @@
#include <variant>
#include "auth/models.hpp"
#include "csv/parsing.hpp"
#include "glue/communication.hpp"
#include "license/license.hpp"
#include "memory/memory_control.hpp"
@ -64,7 +65,6 @@
#include "storage/v2/storage_mode.hpp"
#include "utils/algorithm.hpp"
#include "utils/build_info.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/event_counter.hpp"
#include "utils/event_histogram.hpp"
#include "utils/exceptions.hpp"

View File

@ -29,6 +29,7 @@
#include "query/common.hpp"
#include "spdlog/spdlog.h"
#include "csv/parsing.hpp"
#include "license/license.hpp"
#include "query/auth_checker.hpp"
#include "query/context.hpp"
@ -47,7 +48,6 @@
#include "storage/v2/property_value.hpp"
#include "storage/v2/view.hpp"
#include "utils/algorithm.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/event_counter.hpp"
#include "utils/exceptions.hpp"
#include "utils/fnv.hpp"
@ -4822,7 +4822,7 @@ class LoadCsvCursor : public Cursor {
// persists between pulls, so it can't use the evalutation context memory
// resource.
return csv::Reader(
*maybe_file,
csv::CsvSource::Create(*maybe_file),
csv::Reader::Config(self_->with_header_, self_->ignore_bad_, std::move(maybe_delim), std::move(maybe_quote)),
utils::NewDeleteResource());
}

View File

@ -7,5 +7,7 @@ find_package(gflags REQUIRED)
add_library(mg-requests STATIC ${requests_src_files})
target_link_libraries(mg-requests mg-utils spdlog::spdlog fmt::fmt gflags json ${CURL_LIBRARIES})
target_link_libraries(mg-requests
PUBLIC mg-utils spdlog::spdlog fmt::fmt gflags json ${CURL_LIBRARIES}
PRIVATE lib::ctre)
target_include_directories(mg-requests PRIVATE ${CURL_INCLUDE_DIRS})

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -16,6 +16,7 @@
#include <curl/curl.h>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <ctre/ctre.hpp>
#include "utils/logging.hpp"
@ -113,4 +114,35 @@ bool CreateAndDownloadFile(const std::string &url, const std::string &path, int
return true;
}
auto DownloadToStream(char const *url, std::ostream &os) -> bool {
constexpr auto WriteCallback = [](char *ptr, size_t size, size_t nmemb, std::ostream *os) -> size_t {
auto const totalSize = static_cast<std::streamsize>(size * nmemb);
os->write(ptr, totalSize);
return totalSize;
};
auto *curl_handle{curl_easy_init()};
curl_easy_setopt(curl_handle, CURLOPT_URL, url);
curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, +WriteCallback);
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &os);
auto const res = curl_easy_perform(curl_handle);
long response_code = 0; // NOLINT
curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code);
curl_easy_cleanup(curl_handle);
if (res != CURLE_OK) {
SPDLOG_WARN("Couldn't perform request: {}", curl_easy_strerror(res));
return false;
}
constexpr auto protocol_matcher = ctre::starts_with<"(https?|ftp)://">;
if (protocol_matcher(url) && response_code != 200) {
SPDLOG_WARN("Request response code isn't 200 (received {})!", response_code);
return false;
}
return true;
}
} // namespace memgraph::requests

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -11,8 +11,11 @@
#pragma once
#include <istream>
#include <sstream>
#include <string>
#include <curl/curl.h>
#include <json/json.hpp>
namespace memgraph::requests {
@ -47,4 +50,16 @@ bool RequestPostJson(const std::string &url, const nlohmann::json &data, int tim
*/
bool CreateAndDownloadFile(const std::string &url, const std::string &path, int timeout_in_seconds = 10);
/**
* Downloads content into a stream
*
* This function sends a GET request an put the response within a stream.
* Using c-string because internals interop with a C API
*
* @param url url of the contents
* @param os an output stream
* @return bool true if the request was successful, false otherwise.
*/
auto DownloadToStream(char const *url, std::ostream &os) -> bool;
} // namespace memgraph::requests

View File

@ -4,7 +4,6 @@ set(utils_src_files
event_counter.cpp
event_gauge.cpp
event_histogram.cpp
csv_parsing.cpp
file.cpp
file_locker.cpp
memory.cpp

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -34,17 +34,25 @@ namespace memgraph::utils {
* // long block of code, might trow an exception
* }
*/
class OnScopeExit {
template <typename Callable>
class [[nodiscard]] OnScopeExit {
public:
explicit OnScopeExit(const std::function<void()> &function) : function_(function) {}
~OnScopeExit() { function_(); }
void Disable() {
function_ = [] {};
explicit OnScopeExit(Callable &&function) : function_{std::forward<Callable>(function)}, doCall_{true} {}
OnScopeExit(OnScopeExit const &) = delete;
OnScopeExit(OnScopeExit &&) = delete;
OnScopeExit &operator=(OnScopeExit const &) = delete;
OnScopeExit &operator=(OnScopeExit &&) = delete;
~OnScopeExit() {
if (doCall_) function_();
}
void Disable() { doCall_ = false; }
private:
std::function<void()> function_;
bool doCall_;
};
template <typename Callable>
OnScopeExit(Callable &&) -> OnScopeExit<Callable>;
} // namespace memgraph::utils

View File

@ -272,8 +272,8 @@ target_link_libraries(${test_prefix}utils_file_locker mg-utils fmt)
add_unit_test(utils_thread_pool.cpp)
target_link_libraries(${test_prefix}utils_thread_pool mg-utils fmt)
add_unit_test(utils_csv_parsing.cpp ${CMAKE_SOURCE_DIR}/src/utils/csv_parsing.cpp)
target_link_libraries(${test_prefix}utils_csv_parsing mg-utils fmt)
add_unit_test(csv_csv_parsing.cpp)
target_link_libraries(${test_prefix}csv_csv_parsing mg::csv)
add_unit_test(utils_async_timer.cpp)
target_link_libraries(${test_prefix}utils_async_timer mg-utils)

View File

@ -9,13 +9,29 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "csv/parsing.hpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "utils/csv_parsing.hpp"
#include "utils/string.hpp"
class CsvReaderTest : public ::testing::TestWithParam<const char *> {
#include <boost/iostreams/filter/bzip2.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filtering_stream.hpp>
using namespace memgraph::csv;
enum class CompressionMethod : uint8_t {
NONE,
GZip,
BZip2,
};
struct TestParam {
const char *newline;
CompressionMethod compressionMethod;
};
class CsvReaderTest : public ::testing::TestWithParam<TestParam> {
protected:
const std::filesystem::path csv_directory{std::filesystem::temp_directory_path() / "csv_testing"};
@ -41,8 +57,9 @@ class CsvReaderTest : public ::testing::TestWithParam<const char *> {
namespace {
class FileWriter {
public:
explicit FileWriter(const std::filesystem::path path, std::string newline = "\n") : newline_{std::move(newline)} {
stream_.open(path);
explicit FileWriter(std::filesystem::path path, std::string newline, CompressionMethod compressionMethod)
: newline_{std::move(newline)}, compressionMethod_{compressionMethod}, path_{std::move(path)} {
stream_.open(path_);
}
FileWriter(const FileWriter &) = delete;
@ -51,7 +68,25 @@ class FileWriter {
FileWriter(FileWriter &&) = delete;
FileWriter &operator=(FileWriter &&) = delete;
void Close() { stream_.close(); }
void Close() {
stream_.close();
if (compressionMethod_ == CompressionMethod::NONE) return;
auto input = std::ifstream{path_, std::ios::binary};
auto tmp_path = std::filesystem::path{path_.string() + ".gz"};
auto output = std::ofstream{tmp_path, std::ios::binary | std::ios::trunc};
boost::iostreams::filtering_ostream stream;
if (compressionMethod_ == CompressionMethod::GZip) stream.push(boost::iostreams::gzip_compressor());
if (compressionMethod_ == CompressionMethod::BZip2) stream.push(boost::iostreams::bzip2_compressor());
stream.push(output);
stream << input.rdbuf();
input.close();
stream.reset();
output.close();
std::filesystem::remove(path_);
std::filesystem::rename(tmp_path, path_);
}
size_t WriteLine(const std::string_view line) {
if (!stream_.is_open()) {
@ -67,6 +102,8 @@ class FileWriter {
private:
std::ofstream stream_;
std::string newline_;
CompressionMethod compressionMethod_;
std::filesystem::path path_;
};
std::string CreateRow(const std::vector<std::string> &columns, const std::string_view delim) {
@ -86,7 +123,7 @@ auto ToPmrColumns(const std::vector<std::string> &columns) {
TEST_P(CsvReaderTest, CommaDelimiter) {
// create a file with a single valid row;
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
const std::vector<std::string> columns{"A", "B", "C"};
writer.WriteLine(CreateRow(columns, ","));
@ -100,8 +137,8 @@ TEST_P(CsvReaderTest, CommaDelimiter) {
memgraph::utils::pmr::string delimiter{",", mem};
memgraph::utils::pmr::string quote{"\"", mem};
memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg, mem);
Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg, mem);
auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(*parsed_row, ToPmrColumns(columns));
@ -109,7 +146,7 @@ TEST_P(CsvReaderTest, CommaDelimiter) {
TEST_P(CsvReaderTest, SemicolonDelimiter) {
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource());
@ -123,8 +160,8 @@ TEST_P(CsvReaderTest, SemicolonDelimiter) {
const bool with_header = false;
const bool ignore_bad = false;
const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg, mem);
const Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg, mem);
auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(*parsed_row, ToPmrColumns(columns));
@ -135,7 +172,7 @@ TEST_P(CsvReaderTest, SkipBad) {
// missing closing quote);
// the last row is valid;
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource());
@ -156,8 +193,8 @@ TEST_P(CsvReaderTest, SkipBad) {
// parser's output should be solely the valid row;
const bool with_header = false;
const bool ignore_bad = true;
const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg, mem);
const Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg, mem);
auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(*parsed_row, ToPmrColumns(columns_good));
@ -168,10 +205,10 @@ TEST_P(CsvReaderTest, SkipBad) {
// an exception must be thrown;
const bool with_header = false;
const bool ignore_bad = false;
const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg, mem);
const Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg, mem);
EXPECT_THROW(reader.GetNextRow(mem), memgraph::csv::CsvReadException);
EXPECT_THROW(reader.GetNextRow(mem), CsvReadException);
}
}
@ -179,7 +216,7 @@ TEST_P(CsvReaderTest, AllRowsValid) {
// create a file with all rows valid;
// parser should return 'std::nullopt'
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource());
@ -195,8 +232,8 @@ TEST_P(CsvReaderTest, AllRowsValid) {
const bool with_header = false;
const bool ignore_bad = false;
const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg);
const Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg);
const auto pmr_columns = ToPmrColumns(columns);
while (auto parsed_row = reader.GetNextRow(mem)) {
@ -208,7 +245,7 @@ TEST_P(CsvReaderTest, SkipAllRows) {
// create a file with all rows invalid (containing a string with a missing closing quote);
// parser should return 'std::nullopt'
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource());
@ -224,8 +261,8 @@ TEST_P(CsvReaderTest, SkipAllRows) {
const bool with_header = false;
const bool ignore_bad = true;
const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg);
const Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg);
auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(parsed_row, std::nullopt);
@ -233,7 +270,7 @@ TEST_P(CsvReaderTest, SkipAllRows) {
TEST_P(CsvReaderTest, WithHeader) {
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource());
@ -251,8 +288,8 @@ TEST_P(CsvReaderTest, WithHeader) {
const bool with_header = true;
const bool ignore_bad = false;
const memgraph::csv::Reader::Config cfg(with_header, ignore_bad, delimiter, quote);
auto reader = memgraph::csv::Reader(filepath, cfg);
const Reader::Config cfg(with_header, ignore_bad, delimiter, quote);
auto reader = Reader(FileCsvSource{filepath}, cfg);
const auto pmr_header = ToPmrColumns(header);
ASSERT_EQ(reader.GetHeader(), pmr_header);
@ -268,7 +305,7 @@ TEST_P(CsvReaderTest, MultilineQuotedString) {
// string spanning two lines;
// parser should return two valid rows
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource());
@ -287,8 +324,8 @@ TEST_P(CsvReaderTest, MultilineQuotedString) {
const bool with_header = false;
const bool ignore_bad = true;
const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg);
const Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg);
auto parsed_row = reader.GetNextRow(mem);
ASSERT_EQ(*parsed_row, ToPmrColumns(first_row));
@ -302,7 +339,7 @@ TEST_P(CsvReaderTest, EmptyColumns) {
// create a file with all rows valid;
// parser should return 'std::nullopt'
const auto filepath = csv_directory / "bla.csv";
auto writer = FileWriter(filepath, GetParam());
auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod);
memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource());
@ -319,8 +356,8 @@ TEST_P(CsvReaderTest, EmptyColumns) {
const bool with_header = false;
const bool ignore_bad = false;
const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = memgraph::csv::Reader(filepath, cfg);
const Reader::Config cfg{with_header, ignore_bad, delimiter, quote};
auto reader = Reader(FileCsvSource{filepath}, cfg);
for (const auto &expected_row : expected_rows) {
const auto pmr_expected_row = ToPmrColumns(expected_row);
@ -330,4 +367,8 @@ TEST_P(CsvReaderTest, EmptyColumns) {
}
}
INSTANTIATE_TEST_CASE_P(NewlineParameterizedTest, CsvReaderTest, ::testing::Values("\n", "\r\n"));
INSTANTIATE_TEST_CASE_P(NewlineParameterizedTest, CsvReaderTest,
::testing::Values(TestParam{"\n", CompressionMethod::NONE},
TestParam{"\r\n", CompressionMethod::NONE},
TestParam{"\n", CompressionMethod::GZip},
TestParam{"\n", CompressionMethod::BZip2}));

View File

@ -15,6 +15,7 @@
#include "communication/bolt/v1/value.hpp"
#include "communication/result_stream_faker.hpp"
#include "csv/parsing.hpp"
#include "glue/communication.hpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@ -28,7 +29,6 @@
#include "query_common.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/logging.hpp"
namespace {