diff --git a/CMakeLists.txt b/CMakeLists.txt index 8f3554ac2..7a1ffe76d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,7 @@ # MemGraph CMake configuration -cmake_minimum_required(VERSION 3.8) +cmake_minimum_required(VERSION 3.12) +cmake_policy(SET CMP0076 NEW) # !! IMPORTANT !! run ./project_root/init.sh before cmake command # to download dependencies @@ -18,10 +19,12 @@ set_directory_properties(PROPERTIES CLEAN_NO_CUSTOM TRUE) # during the code coverage process find_program(CCACHE_FOUND ccache) option(USE_CCACHE "ccache:" ON) -message(STATUS "CCache: ${USE_CCACHE}") if(CCACHE_FOUND AND USE_CCACHE) set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache) set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache) + message(STATUS "CCache: Used") +else () + message(STATUS "CCache: Not used") endif(CCACHE_FOUND AND USE_CCACHE) # choose a compiler @@ -37,7 +40,14 @@ endif() # ----------------------------------------------------------------------------- -project(memgraph) +project(memgraph LANGUAGES C CXX) + +#TODO: upgrade to cmake 3.24 + CheckIPOSupported +#cmake_policy(SET CMP0138 NEW) +#include(CheckIPOSupported) +#check_ipo_supported() +#set(CMAKE_INTERPROCEDURAL_OPTIMIZATION_Release TRUE) +#set(CMAKE_INTERPROCEDURAL_OPTIMIZATION_RelWithDebInfo TRUE) # Install licenses. install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/licenses/ @@ -160,7 +170,7 @@ endif() # setup CMake module path, defines path for include() and find_package() # https://cmake.org/cmake/help/latest/variable/CMAKE_MODULE_PATH.html -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${PROJECT_SOURCE_DIR}/cmake) +list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") # custom function definitions include(functions) # ----------------------------------------------------------------------------- diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 4c5367961..da02f0bf5 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -29,6 +29,7 @@ function(import_header_library name include_dir) set(${_upper_name}_INCLUDE_DIR ${include_dir} CACHE FILEPATH "Path to ${name} include directory" FORCE) mark_as_advanced(${_upper_name}_INCLUDE_DIR) + add_library(lib::${name} ALIAS ${name}) endfunction(import_header_library) function(import_library name type location include_dir) @@ -257,3 +258,6 @@ import_external_library(librdtsc STATIC ${CMAKE_CURRENT_SOURCE_DIR}/librdtsc/include CMAKE_ARGS ${MG_LIBRDTSC_CMAKE_ARGS} BUILD_COMMAND $(MAKE) rdtsc) + +# setup ctre +import_header_library(ctre ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/libs/setup.sh b/libs/setup.sh index 42967bca9..b67aac857 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -122,6 +122,7 @@ declare -A primary_urls=( ["protobuf"]="http://$local_cache_host/git/protobuf.git" ["pulsar"]="http://$local_cache_host/git/pulsar.git" ["librdtsc"]="http://$local_cache_host/git/librdtsc.git" + ["ctre"]="http://$local_cache_host/file/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp" ) # The goal of secondary urls is to have links to the "source of truth" of @@ -147,6 +148,7 @@ declare -A secondary_urls=( ["protobuf"]="https://github.com/protocolbuffers/protobuf.git" ["pulsar"]="https://github.com/apache/pulsar.git" ["librdtsc"]="https://github.com/gabrieleara/librdtsc.git" + ["ctre"]="https://raw.githubusercontent.com/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp" ) # antlr @@ -238,3 +240,9 @@ repo_clone_try_double "${primary_urls[librdtsc]}" "${secondary_urls[librdtsc]}" pushd librdtsc git apply ../librdtsc.patch popd + +#ctre +mkdir -p ctre +cd ctre +file_get_try_double "${primary_urls[ctre]}" "${secondary_urls[ctre]}" +cd .. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index aae7c365c..91e1d31e5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,7 @@ # CMake configuration for the main memgraph library and executable # add memgraph sub libraries, ordered by dependency +add_subdirectory(csv) add_subdirectory(utils) add_subdirectory(requests) add_subdirectory(io) diff --git a/src/csv/CMakeLists.txt b/src/csv/CMakeLists.txt new file mode 100644 index 000000000..fa41ac3d7 --- /dev/null +++ b/src/csv/CMakeLists.txt @@ -0,0 +1,18 @@ +add_library(mg-csv STATIC) +add_library(mg::csv ALIAS mg-csv) +target_sources(mg-csv + PUBLIC + include/csv/parsing.hpp + + PRIVATE + parsing.cpp + ) +target_include_directories(mg-csv PUBLIC include) + +find_package(Boost REQUIRED COMPONENTS iostreams) +target_link_libraries(mg-csv + PUBLIC mg-utils + PRIVATE lib::ctre mg-requests Boost::iostreams + ) + +add_subdirectory(fuzz) diff --git a/src/csv/fuzz/CMakeLists.txt b/src/csv/fuzz/CMakeLists.txt new file mode 100644 index 000000000..89893e7ef --- /dev/null +++ b/src/csv/fuzz/CMakeLists.txt @@ -0,0 +1,5 @@ +add_executable(fuzz_csv EXCLUDE_FROM_ALL) +target_sources(fuzz_csv PRIVATE fuzz_reader.cpp) +target_link_libraries(fuzz_csv PRIVATE mg::csv) +target_compile_options(fuzz_csv PRIVATE -fsanitize=fuzzer) +target_link_libraries(fuzz_csv PRIVATE -fsanitize=fuzzer) diff --git a/src/csv/fuzz/fuzz_reader.cpp b/src/csv/fuzz/fuzz_reader.cpp new file mode 100644 index 000000000..b936619ec --- /dev/null +++ b/src/csv/fuzz/fuzz_reader.cpp @@ -0,0 +1,78 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include + +#include "csv/parsing.hpp" +#include "utils/string.hpp" + +namespace mg = memgraph; +namespace csv = mg::csv; + +using pmr_str = mg::utils::pmr::string; + +extern "C" int LLVMFuzzerTestOneInput(std::uint8_t const *data, std::size_t size) { + // need to parse a config + if (size < 4) return 0; + auto const with_header = bool(data[0]); + auto const ignore_bad = bool(data[1]); + auto const delim_size = data[2]; + auto const quote_size = data[3]; + // 0 will be nullopt, everything else will be one smaller + // 0 , 1, 2, 3, 4 + // nullopt, 0, 1, 2, 3 + auto const delim_real_size = delim_size == 0 ? 0 : delim_size - 1; + auto const quote_real_size = quote_size == 0 ? 0 : quote_size - 1; + // not worth testing if too large + if (delim_real_size > 3 || quote_real_size > 3) return 0; + + // is there enough space for them to exist + if (size < 4 + delim_real_size + quote_real_size) return 0; + auto const delim_start = 4; + auto const delim_end = delim_start + delim_real_size; + auto const quote_start = delim_end; + auto const quote_end = quote_start + quote_real_size; + + auto *mem = mg::utils::NewDeleteResource(); + auto delim = delim_size == 0 ? std::optional{} : pmr_str(&data[delim_start], &data[delim_end], mem); + auto quote = quote_size == 0 ? std::optional{} : pmr_str(&data[quote_start], &data[quote_end], mem); + + auto const remaining = static_cast(size) - quote_end; + if (remaining < 0) __builtin_trap(); // if this hits, above parsing is wrong + + // ############################################################################################################# + + // build Config + auto cfg = csv::Reader::Config{with_header, ignore_bad, std::move(delim), std::move(quote)}; + + // build CSV source + auto ss = std::stringstream{}; + ss.write(reinterpret_cast(&data[quote_end]), static_cast(remaining)); + auto source = csv::StreamCsvSource{std::move(ss)}; + + // ############################################################################################################# + try { + auto reader = memgraph::csv::Reader(std::move(source), std::move(cfg)); + auto const header = reader.GetHeader(); + asm volatile("" : : "g"(header) : "memory"); + while (true) { + auto row = reader.GetNextRow(mem); + if (!row) break; + asm volatile("" : : "g"(row) : "memory"); + } + } catch (csv::CsvReadException const &) { + // CsvReadException is ok + } + + return 0; +} diff --git a/src/utils/csv_parsing.hpp b/src/csv/include/csv/parsing.hpp similarity index 56% rename from src/utils/csv_parsing.hpp rename to src/csv/include/csv/parsing.hpp index 928654ca8..72af67297 100644 --- a/src/utils/csv_parsing.hpp +++ b/src/csv/include/csv/parsing.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "utils/exceptions.hpp" @@ -36,13 +37,53 @@ class CsvReadException : public utils::BasicException { using utils::BasicException::BasicException; }; +class FileCsvSource { + public: + explicit FileCsvSource(std::filesystem::path path); + std::istream &GetStream(); + + private: + std::filesystem::path path_; + std::ifstream stream_; +}; + +class StreamCsvSource { + public: + StreamCsvSource(std::stringstream stream) : stream_{std::move(stream)} {} + std::istream &GetStream() { return stream_; } + + private: + std::stringstream stream_; +}; + +class UrlCsvSource : public StreamCsvSource { + public: + UrlCsvSource(char const *url); +}; + +class CsvSource { + public: + static auto Create(utils::pmr::string const &csv_location) -> CsvSource; + CsvSource(FileCsvSource source) : source_{std::move(source)} {} + CsvSource(StreamCsvSource source) : source_{std::move(source)} {} + CsvSource(UrlCsvSource source) : source_{std::move(source)} {} + std::istream &GetStream(); + + private: + std::variant source_; +}; + class Reader { public: struct Config { Config() = default; Config(const bool with_header, const bool ignore_bad, std::optional delim, std::optional qt) - : with_header(with_header), ignore_bad(ignore_bad), delimiter(std::move(delim)), quote(std::move(qt)) {} + : with_header(with_header), ignore_bad(ignore_bad), delimiter(std::move(delim)), quote(std::move(qt)) { + // delimiter + quote can not be empty + if (delimiter && delimiter->empty()) delimiter.reset(); + if (quote && quote->empty()) quote.reset(); + } bool with_header{false}; bool ignore_bad{false}; @@ -53,16 +94,7 @@ class Reader { using Row = utils::pmr::vector; using Header = utils::pmr::vector; - Reader() = default; - explicit Reader(std::filesystem::path path, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource()) - : memory_(mem), path_(std::move(path)) { - read_config_.with_header = cfg.with_header; - read_config_.ignore_bad = cfg.ignore_bad; - read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_}; - read_config_.quote = cfg.quote ? std::move(*cfg.quote) : utils::pmr::string{"\"", memory_}; - InitializeStream(); - TryInitializeHeader(); - } + explicit Reader(CsvSource source, Config cfg, utils::MemoryResource *mem = utils::NewDeleteResource()); Reader(const Reader &) = delete; Reader &operator=(const Reader &) = delete; @@ -81,28 +113,18 @@ class Reader { }; using ParsingResult = utils::BasicResult; - [[nodiscard]] bool HasHeader() const; - const Header &GetHeader() const; - std::optional GetNextRow(utils::MemoryResource *mem); + + bool HasHeader() const; + auto GetHeader() const -> Header const &; + auto GetNextRow(utils::MemoryResource *mem) -> std::optional; private: - utils::MemoryResource *memory_; - std::filesystem::path path_; - std::ifstream csv_stream_; - Config read_config_; - uint64_t line_count_{1}; - uint16_t number_of_columns_{0}; - Header header_{memory_}; - - void InitializeStream(); - - void TryInitializeHeader(); - - std::optional GetNextLine(utils::MemoryResource *mem); - - ParsingResult ParseHeader(); - - ParsingResult ParseRow(utils::MemoryResource *mem); + // Some implementation issues that need clearing up, but this is mainly because + // I don't want `boost/iostreams/filtering_stream.hpp` included in this header file + // Because it causes issues when combined with antlr headers + // When we have C++20 modules this can be fixed + struct impl; + std::unique_ptr pimpl; }; } // namespace memgraph::csv diff --git a/src/utils/csv_parsing.cpp b/src/csv/parsing.cpp similarity index 59% rename from src/utils/csv_parsing.cpp rename to src/csv/parsing.cpp index 4744f2100..6d03dc7fd 100644 --- a/src/utils/csv_parsing.cpp +++ b/src/csv/parsing.cpp @@ -9,33 +9,132 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include "utils/csv_parsing.hpp" +#include "csv/parsing.hpp" #include +#include +#include +#include +#include + +#include "requests/requests.hpp" #include "utils/file.hpp" +#include "utils/on_scope_exit.hpp" #include "utils/string.hpp" +using PlainStream = boost::iostreams::filtering_istream; + namespace memgraph::csv { using ParseError = Reader::ParseError; -void Reader::InitializeStream() { - if (!std::filesystem::exists(path_)) { - throw CsvReadException("CSV file not found: {}", path_.string()); - } - csv_stream_.open(path_); - if (!csv_stream_.good()) { - throw CsvReadException("CSV file {} couldn't be opened!", path_.string()); - } +struct Reader::impl { + impl(CsvSource source, Reader::Config cfg, utils::MemoryResource *mem); + + [[nodiscard]] bool HasHeader() const { return read_config_.with_header; } + [[nodiscard]] auto Header() const -> Header const & { return header_; } + + auto GetNextRow(utils::MemoryResource *mem) -> std::optional; + + private: + void InitializeStream(); + + void TryInitializeHeader(); + + std::optional GetNextLine(utils::MemoryResource *mem); + + ParsingResult ParseHeader(); + + ParsingResult ParseRow(utils::MemoryResource *mem); + + utils::MemoryResource *memory_; + std::filesystem::path path_; + CsvSource source_; + PlainStream csv_stream_; + Config read_config_; + uint64_t line_count_{1}; + uint16_t number_of_columns_{0}; + Reader::Header header_{memory_}; +}; + +Reader::impl::impl(CsvSource source, Reader::Config cfg, utils::MemoryResource *mem) + : memory_(mem), source_(std::move(source)) { + read_config_.with_header = cfg.with_header; + read_config_.ignore_bad = cfg.ignore_bad; + read_config_.delimiter = cfg.delimiter ? std::move(*cfg.delimiter) : utils::pmr::string{",", memory_}; + read_config_.quote = cfg.quote ? std::move(*cfg.quote) : utils::pmr::string{"\"", memory_}; + InitializeStream(); + TryInitializeHeader(); } -std::optional Reader::GetNextLine(utils::MemoryResource *mem) { +enum class CompressionMethod : uint8_t { + NONE, + GZip, + BZip2, +}; + +/// Detect compression based on magic sequences +auto DetectCompressionMethod(std::istream &is) -> CompressionMethod { + // Ensure stream is reset + auto const on_exit = utils::OnScopeExit{[&]() { is.seekg(std::ios::beg); }}; + + // Note we must use bytes for comparison, not char + // + std::byte c{}; // this gets reused + auto const next_byte = [&](std::byte &b) { return bool(is.get(reinterpret_cast(b))); }; + if (!next_byte(c)) return CompressionMethod::NONE; + + auto const as_bytes = [](Args... args) { + return std::array{std::byte(args)...}; + }; + + // Gzip - 0x1F8B + constexpr static auto gzip_seq = as_bytes(0x1F, 0x8B); + if (c == gzip_seq[0]) { + if (!next_byte(c) || c != gzip_seq[1]) return CompressionMethod::NONE; + return CompressionMethod::GZip; + } + + // BZip2 - 0x425A68 + constexpr static auto bzip_seq = as_bytes(0x42, 0x5A, 0x68); + if (c == bzip_seq[0]) { + if (!next_byte(c) || c != bzip_seq[1]) return CompressionMethod::NONE; + if (!next_byte(c) || c != bzip_seq[2]) return CompressionMethod::NONE; + return CompressionMethod::BZip2; + } + return CompressionMethod::NONE; +} + +Reader::Reader(CsvSource source, Reader::Config cfg, utils::MemoryResource *mem) + : pimpl{new impl{std::move(source), std::move(cfg), mem}, [](impl *p) { delete p; }} {} + +void Reader::impl::InitializeStream() { + auto &source = source_.GetStream(); + + auto const method = DetectCompressionMethod(source); + switch (method) { + case CompressionMethod::GZip: + csv_stream_.push(boost::iostreams::gzip_decompressor{}); + break; + case CompressionMethod::BZip2: + csv_stream_.push(boost::iostreams::bzip2_decompressor{}); + break; + default: + break; + } + + csv_stream_.push(source); + MG_ASSERT(csv_stream_.auto_close(), "Should be 'auto close' for correct operation"); + MG_ASSERT(csv_stream_.is_complete(), "Should be 'complete' for correct operation"); +} + +std::optional Reader::impl::GetNextLine(utils::MemoryResource *mem) { utils::pmr::string line(mem); if (!std::getline(csv_stream_, line)) { // reached end of file or an I/0 error occurred if (!csv_stream_.good()) { - csv_stream_.close(); + csv_stream_.reset(); // this will close the file_stream_ and clear the chain } return std::nullopt; } @@ -43,13 +142,13 @@ std::optional Reader::GetNextLine(utils::MemoryResource *mem return std::move(line); } -Reader::ParsingResult Reader::ParseHeader() { +Reader::ParsingResult Reader::impl::ParseHeader() { // header must be the very first line in the file MG_ASSERT(line_count_ == 1, "Invalid use of {}", __func__); return ParseRow(memory_); } -void Reader::TryInitializeHeader() { +void Reader::impl::TryInitializeHeader() { if (!HasHeader()) { return; } @@ -67,16 +166,16 @@ void Reader::TryInitializeHeader() { header_ = std::move(*header); } -[[nodiscard]] bool Reader::HasHeader() const { return read_config_.with_header; } +[[nodiscard]] bool Reader::HasHeader() const { return pimpl->HasHeader(); } -const Reader::Header &Reader::GetHeader() const { return header_; } +const Reader::Header &Reader::GetHeader() const { return pimpl->Header(); } namespace { enum class CsvParserState : uint8_t { INITIAL_FIELD, NEXT_FIELD, QUOTING, EXPECT_DELIMITER, DONE }; } // namespace -Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) { +Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) { utils::pmr::vector row(mem); if (number_of_columns_ != 0) { row.reserve(number_of_columns_); @@ -140,9 +239,10 @@ Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) { break; } case CsvParserState::QUOTING: { + const auto quote_size = read_config_.quote->size(); const auto quote_now = utils::StartsWith(line_string_view, *read_config_.quote); - const auto quote_next = - utils::StartsWith(line_string_view.substr(read_config_.quote->size()), *read_config_.quote); + const auto quote_next = quote_size <= line_string_view.size() && + utils::StartsWith(line_string_view.substr(quote_size), *read_config_.quote); if (quote_now && quote_next) { // This is an escaped quote character. column += *read_config_.quote; @@ -216,12 +316,7 @@ Reader::ParsingResult Reader::ParseRow(utils::MemoryResource *mem) { return std::move(row); } -// Returns Reader::Row if the read row if valid; -// Returns std::nullopt if end of file is reached or an error occurred -// making it unreadable; -// @throws CsvReadException if a bad row is encountered, and the ignore_bad is set -// to 'true' in the Reader::Config. -std::optional Reader::GetNextRow(utils::MemoryResource *mem) { +std::optional Reader::impl::GetNextRow(utils::MemoryResource *mem) { auto row = ParseRow(mem); if (row.HasError()) { @@ -245,4 +340,44 @@ std::optional Reader::GetNextRow(utils::MemoryResource *mem) { return std::move(*row); } +// Returns Reader::Row if the read row if valid; +// Returns std::nullopt if end of file is reached or an error occurred +// making it unreadable; +// @throws CsvReadException if a bad row is encountered, and the ignore_bad is set +// to 'true' in the Reader::Config. +std::optional Reader::GetNextRow(utils::MemoryResource *mem) { return pimpl->GetNextRow(mem); } + +FileCsvSource::FileCsvSource(std::filesystem::path path) : path_(std::move(path)) { + if (!std::filesystem::exists(path_)) { + throw CsvReadException("CSV file not found: {}", path_.string()); + } + stream_.open(path_); + if (!stream_.good()) { + throw CsvReadException("CSV file {} couldn't be opened!", path_.string()); + } +} +std::istream &FileCsvSource::GetStream() { return stream_; } + +std::istream &CsvSource::GetStream() { + return *std::visit([](auto &&source) { return std::addressof(source.GetStream()); }, source_); +} + +auto CsvSource::Create(const utils::pmr::string &csv_location) -> CsvSource { + constexpr auto protocol_matcher = ctre::starts_with<"(https?|ftp)://">; + if (protocol_matcher(csv_location)) { + return csv::UrlCsvSource{csv_location.c_str()}; + } + return csv::FileCsvSource{csv_location}; +} + +// Helper for UrlCsvSource +auto urlToStringStream(const char *url) -> std::stringstream { + auto ss = std::stringstream{}; + if (!requests::DownloadToStream(url, ss)) { + throw CsvReadException("CSV was unable to be fetched from {}", url); + } + return ss; +}; + +UrlCsvSource::UrlCsvSource(const char *url) : StreamCsvSource{urlToStringStream(url)} {} } // namespace memgraph::csv diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index e7e2bd66e..63eedf5d2 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -37,12 +37,9 @@ set(mg_query_sources graph.cpp db_accessor.cpp) -find_package(Boost REQUIRED) - add_library(mg-query STATIC ${mg_query_sources}) target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(mg-query dl cppitertools Boost::headers) -target_link_libraries(mg-query mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-license mg-utils mg-kvstore mg-memory) +target_link_libraries(mg-query PUBLIC dl cppitertools Python3::Python mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-license mg-utils mg-kvstore mg-memory mg::csv) if(NOT "${MG_PYTHON_PATH}" STREQUAL "") set(Python3_ROOT_DIR "${MG_PYTHON_PATH}") endif() @@ -51,7 +48,6 @@ if("${MG_PYTHON_VERSION}" STREQUAL "") else() find_package(Python3 "${MG_PYTHON_VERSION}" EXACT REQUIRED COMPONENTS Development) endif() -target_link_libraries(mg-query Python3::Python) # Generate Antlr openCypher parser @@ -94,4 +90,4 @@ add_library(antlr_opencypher_parser_lib STATIC ${antlr_opencypher_generated_src} add_dependencies(antlr_opencypher_parser_lib generate_opencypher_parser) target_link_libraries(antlr_opencypher_parser_lib antlr4) -target_link_libraries(mg-query antlr_opencypher_parser_lib) +target_link_libraries(mg-query PUBLIC antlr_opencypher_parser_lib) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 3092899f4..a31243385 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -30,6 +30,7 @@ #include #include "auth/models.hpp" +#include "csv/parsing.hpp" #include "glue/communication.hpp" #include "license/license.hpp" #include "memory/memory_control.hpp" @@ -64,7 +65,6 @@ #include "storage/v2/storage_mode.hpp" #include "utils/algorithm.hpp" #include "utils/build_info.hpp" -#include "utils/csv_parsing.hpp" #include "utils/event_counter.hpp" #include "utils/event_histogram.hpp" #include "utils/exceptions.hpp" diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index b99bddffd..040f2da95 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -29,6 +29,7 @@ #include "query/common.hpp" #include "spdlog/spdlog.h" +#include "csv/parsing.hpp" #include "license/license.hpp" #include "query/auth_checker.hpp" #include "query/context.hpp" @@ -47,7 +48,6 @@ #include "storage/v2/property_value.hpp" #include "storage/v2/view.hpp" #include "utils/algorithm.hpp" -#include "utils/csv_parsing.hpp" #include "utils/event_counter.hpp" #include "utils/exceptions.hpp" #include "utils/fnv.hpp" @@ -4822,7 +4822,7 @@ class LoadCsvCursor : public Cursor { // persists between pulls, so it can't use the evalutation context memory // resource. return csv::Reader( - *maybe_file, + csv::CsvSource::Create(*maybe_file), csv::Reader::Config(self_->with_header_, self_->ignore_bad_, std::move(maybe_delim), std::move(maybe_quote)), utils::NewDeleteResource()); } diff --git a/src/requests/CMakeLists.txt b/src/requests/CMakeLists.txt index 5ae3469e4..a63da920a 100644 --- a/src/requests/CMakeLists.txt +++ b/src/requests/CMakeLists.txt @@ -7,5 +7,7 @@ find_package(gflags REQUIRED) add_library(mg-requests STATIC ${requests_src_files}) -target_link_libraries(mg-requests mg-utils spdlog::spdlog fmt::fmt gflags json ${CURL_LIBRARIES}) +target_link_libraries(mg-requests + PUBLIC mg-utils spdlog::spdlog fmt::fmt gflags json ${CURL_LIBRARIES} + PRIVATE lib::ctre) target_include_directories(mg-requests PRIVATE ${CURL_INCLUDE_DIRS}) diff --git a/src/requests/requests.cpp b/src/requests/requests.cpp index da36c0c6a..d6c37bd61 100644 --- a/src/requests/requests.cpp +++ b/src/requests/requests.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -16,6 +16,7 @@ #include #include #include +#include #include "utils/logging.hpp" @@ -113,4 +114,35 @@ bool CreateAndDownloadFile(const std::string &url, const std::string &path, int return true; } +auto DownloadToStream(char const *url, std::ostream &os) -> bool { + constexpr auto WriteCallback = [](char *ptr, size_t size, size_t nmemb, std::ostream *os) -> size_t { + auto const totalSize = static_cast(size * nmemb); + os->write(ptr, totalSize); + return totalSize; + }; + + auto *curl_handle{curl_easy_init()}; + curl_easy_setopt(curl_handle, CURLOPT_URL, url); + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, +WriteCallback); + curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &os); + + auto const res = curl_easy_perform(curl_handle); + long response_code = 0; // NOLINT + curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code); + curl_easy_cleanup(curl_handle); + + if (res != CURLE_OK) { + SPDLOG_WARN("Couldn't perform request: {}", curl_easy_strerror(res)); + return false; + } + + constexpr auto protocol_matcher = ctre::starts_with<"(https?|ftp)://">; + if (protocol_matcher(url) && response_code != 200) { + SPDLOG_WARN("Request response code isn't 200 (received {})!", response_code); + return false; + } + + return true; +} + } // namespace memgraph::requests diff --git a/src/requests/requests.hpp b/src/requests/requests.hpp index 4ded90464..fcf8838a3 100644 --- a/src/requests/requests.hpp +++ b/src/requests/requests.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -11,8 +11,11 @@ #pragma once +#include +#include #include +#include #include namespace memgraph::requests { @@ -47,4 +50,16 @@ bool RequestPostJson(const std::string &url, const nlohmann::json &data, int tim */ bool CreateAndDownloadFile(const std::string &url, const std::string &path, int timeout_in_seconds = 10); +/** + * Downloads content into a stream + * + * This function sends a GET request an put the response within a stream. + * Using c-string because internals interop with a C API + * + * @param url url of the contents + * @param os an output stream + * @return bool true if the request was successful, false otherwise. + */ +auto DownloadToStream(char const *url, std::ostream &os) -> bool; + } // namespace memgraph::requests diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index 8016af7b5..d5144daf6 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -4,7 +4,6 @@ set(utils_src_files event_counter.cpp event_gauge.cpp event_histogram.cpp - csv_parsing.cpp file.cpp file_locker.cpp memory.cpp diff --git a/src/utils/on_scope_exit.hpp b/src/utils/on_scope_exit.hpp index 225c0d870..70aec52b6 100644 --- a/src/utils/on_scope_exit.hpp +++ b/src/utils/on_scope_exit.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -34,17 +34,25 @@ namespace memgraph::utils { * // long block of code, might trow an exception * } */ -class OnScopeExit { +template +class [[nodiscard]] OnScopeExit { public: - explicit OnScopeExit(const std::function &function) : function_(function) {} - ~OnScopeExit() { function_(); } - - void Disable() { - function_ = [] {}; + explicit OnScopeExit(Callable &&function) : function_{std::forward(function)}, doCall_{true} {} + OnScopeExit(OnScopeExit const &) = delete; + OnScopeExit(OnScopeExit &&) = delete; + OnScopeExit &operator=(OnScopeExit const &) = delete; + OnScopeExit &operator=(OnScopeExit &&) = delete; + ~OnScopeExit() { + if (doCall_) function_(); } + void Disable() { doCall_ = false; } + private: std::function function_; + bool doCall_; }; +template +OnScopeExit(Callable &&) -> OnScopeExit; } // namespace memgraph::utils diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 48000668c..411c5571c 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -272,8 +272,8 @@ target_link_libraries(${test_prefix}utils_file_locker mg-utils fmt) add_unit_test(utils_thread_pool.cpp) target_link_libraries(${test_prefix}utils_thread_pool mg-utils fmt) -add_unit_test(utils_csv_parsing.cpp ${CMAKE_SOURCE_DIR}/src/utils/csv_parsing.cpp) -target_link_libraries(${test_prefix}utils_csv_parsing mg-utils fmt) +add_unit_test(csv_csv_parsing.cpp) +target_link_libraries(${test_prefix}csv_csv_parsing mg::csv) add_unit_test(utils_async_timer.cpp) target_link_libraries(${test_prefix}utils_async_timer mg-utils) diff --git a/tests/unit/utils_csv_parsing.cpp b/tests/unit/csv_csv_parsing.cpp similarity index 71% rename from tests/unit/utils_csv_parsing.cpp rename to tests/unit/csv_csv_parsing.cpp index 9fef48af1..13362d2e2 100644 --- a/tests/unit/utils_csv_parsing.cpp +++ b/tests/unit/csv_csv_parsing.cpp @@ -9,13 +9,29 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include "csv/parsing.hpp" #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "utils/csv_parsing.hpp" - #include "utils/string.hpp" -class CsvReaderTest : public ::testing::TestWithParam { +#include +#include +#include + +using namespace memgraph::csv; + +enum class CompressionMethod : uint8_t { + NONE, + GZip, + BZip2, +}; + +struct TestParam { + const char *newline; + CompressionMethod compressionMethod; +}; + +class CsvReaderTest : public ::testing::TestWithParam { protected: const std::filesystem::path csv_directory{std::filesystem::temp_directory_path() / "csv_testing"}; @@ -41,8 +57,9 @@ class CsvReaderTest : public ::testing::TestWithParam { namespace { class FileWriter { public: - explicit FileWriter(const std::filesystem::path path, std::string newline = "\n") : newline_{std::move(newline)} { - stream_.open(path); + explicit FileWriter(std::filesystem::path path, std::string newline, CompressionMethod compressionMethod) + : newline_{std::move(newline)}, compressionMethod_{compressionMethod}, path_{std::move(path)} { + stream_.open(path_); } FileWriter(const FileWriter &) = delete; @@ -51,7 +68,25 @@ class FileWriter { FileWriter(FileWriter &&) = delete; FileWriter &operator=(FileWriter &&) = delete; - void Close() { stream_.close(); } + void Close() { + stream_.close(); + if (compressionMethod_ == CompressionMethod::NONE) return; + + auto input = std::ifstream{path_, std::ios::binary}; + auto tmp_path = std::filesystem::path{path_.string() + ".gz"}; + auto output = std::ofstream{tmp_path, std::ios::binary | std::ios::trunc}; + + boost::iostreams::filtering_ostream stream; + if (compressionMethod_ == CompressionMethod::GZip) stream.push(boost::iostreams::gzip_compressor()); + if (compressionMethod_ == CompressionMethod::BZip2) stream.push(boost::iostreams::bzip2_compressor()); + stream.push(output); + stream << input.rdbuf(); + input.close(); + stream.reset(); + output.close(); + std::filesystem::remove(path_); + std::filesystem::rename(tmp_path, path_); + } size_t WriteLine(const std::string_view line) { if (!stream_.is_open()) { @@ -67,6 +102,8 @@ class FileWriter { private: std::ofstream stream_; std::string newline_; + CompressionMethod compressionMethod_; + std::filesystem::path path_; }; std::string CreateRow(const std::vector &columns, const std::string_view delim) { @@ -86,7 +123,7 @@ auto ToPmrColumns(const std::vector &columns) { TEST_P(CsvReaderTest, CommaDelimiter) { // create a file with a single valid row; const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); const std::vector columns{"A", "B", "C"}; writer.WriteLine(CreateRow(columns, ",")); @@ -100,8 +137,8 @@ TEST_P(CsvReaderTest, CommaDelimiter) { memgraph::utils::pmr::string delimiter{",", mem}; memgraph::utils::pmr::string quote{"\"", mem}; - memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg, mem); + Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg, mem); auto parsed_row = reader.GetNextRow(mem); ASSERT_EQ(*parsed_row, ToPmrColumns(columns)); @@ -109,7 +146,7 @@ TEST_P(CsvReaderTest, CommaDelimiter) { TEST_P(CsvReaderTest, SemicolonDelimiter) { const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource()); @@ -123,8 +160,8 @@ TEST_P(CsvReaderTest, SemicolonDelimiter) { const bool with_header = false; const bool ignore_bad = false; - const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg, mem); + const Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg, mem); auto parsed_row = reader.GetNextRow(mem); ASSERT_EQ(*parsed_row, ToPmrColumns(columns)); @@ -135,7 +172,7 @@ TEST_P(CsvReaderTest, SkipBad) { // missing closing quote); // the last row is valid; const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource()); @@ -156,8 +193,8 @@ TEST_P(CsvReaderTest, SkipBad) { // parser's output should be solely the valid row; const bool with_header = false; const bool ignore_bad = true; - const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg, mem); + const Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg, mem); auto parsed_row = reader.GetNextRow(mem); ASSERT_EQ(*parsed_row, ToPmrColumns(columns_good)); @@ -168,10 +205,10 @@ TEST_P(CsvReaderTest, SkipBad) { // an exception must be thrown; const bool with_header = false; const bool ignore_bad = false; - const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg, mem); + const Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg, mem); - EXPECT_THROW(reader.GetNextRow(mem), memgraph::csv::CsvReadException); + EXPECT_THROW(reader.GetNextRow(mem), CsvReadException); } } @@ -179,7 +216,7 @@ TEST_P(CsvReaderTest, AllRowsValid) { // create a file with all rows valid; // parser should return 'std::nullopt' const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource()); @@ -195,8 +232,8 @@ TEST_P(CsvReaderTest, AllRowsValid) { const bool with_header = false; const bool ignore_bad = false; - const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg); + const Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg); const auto pmr_columns = ToPmrColumns(columns); while (auto parsed_row = reader.GetNextRow(mem)) { @@ -208,7 +245,7 @@ TEST_P(CsvReaderTest, SkipAllRows) { // create a file with all rows invalid (containing a string with a missing closing quote); // parser should return 'std::nullopt' const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource()); @@ -224,8 +261,8 @@ TEST_P(CsvReaderTest, SkipAllRows) { const bool with_header = false; const bool ignore_bad = true; - const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg); + const Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg); auto parsed_row = reader.GetNextRow(mem); ASSERT_EQ(parsed_row, std::nullopt); @@ -233,7 +270,7 @@ TEST_P(CsvReaderTest, SkipAllRows) { TEST_P(CsvReaderTest, WithHeader) { const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource()); @@ -251,8 +288,8 @@ TEST_P(CsvReaderTest, WithHeader) { const bool with_header = true; const bool ignore_bad = false; - const memgraph::csv::Reader::Config cfg(with_header, ignore_bad, delimiter, quote); - auto reader = memgraph::csv::Reader(filepath, cfg); + const Reader::Config cfg(with_header, ignore_bad, delimiter, quote); + auto reader = Reader(FileCsvSource{filepath}, cfg); const auto pmr_header = ToPmrColumns(header); ASSERT_EQ(reader.GetHeader(), pmr_header); @@ -268,7 +305,7 @@ TEST_P(CsvReaderTest, MultilineQuotedString) { // string spanning two lines; // parser should return two valid rows const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource()); @@ -287,8 +324,8 @@ TEST_P(CsvReaderTest, MultilineQuotedString) { const bool with_header = false; const bool ignore_bad = true; - const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg); + const Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg); auto parsed_row = reader.GetNextRow(mem); ASSERT_EQ(*parsed_row, ToPmrColumns(first_row)); @@ -302,7 +339,7 @@ TEST_P(CsvReaderTest, EmptyColumns) { // create a file with all rows valid; // parser should return 'std::nullopt' const auto filepath = csv_directory / "bla.csv"; - auto writer = FileWriter(filepath, GetParam()); + auto writer = FileWriter(filepath, GetParam().newline, GetParam().compressionMethod); memgraph::utils::MemoryResource *mem(memgraph::utils::NewDeleteResource()); @@ -319,8 +356,8 @@ TEST_P(CsvReaderTest, EmptyColumns) { const bool with_header = false; const bool ignore_bad = false; - const memgraph::csv::Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; - auto reader = memgraph::csv::Reader(filepath, cfg); + const Reader::Config cfg{with_header, ignore_bad, delimiter, quote}; + auto reader = Reader(FileCsvSource{filepath}, cfg); for (const auto &expected_row : expected_rows) { const auto pmr_expected_row = ToPmrColumns(expected_row); @@ -330,4 +367,8 @@ TEST_P(CsvReaderTest, EmptyColumns) { } } -INSTANTIATE_TEST_CASE_P(NewlineParameterizedTest, CsvReaderTest, ::testing::Values("\n", "\r\n")); +INSTANTIATE_TEST_CASE_P(NewlineParameterizedTest, CsvReaderTest, + ::testing::Values(TestParam{"\n", CompressionMethod::NONE}, + TestParam{"\r\n", CompressionMethod::NONE}, + TestParam{"\n", CompressionMethod::GZip}, + TestParam{"\n", CompressionMethod::BZip2})); diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index d7ad623d6..f2ce07479 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -15,6 +15,7 @@ #include "communication/bolt/v1/value.hpp" #include "communication/result_stream_faker.hpp" +#include "csv/parsing.hpp" #include "glue/communication.hpp" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -28,7 +29,6 @@ #include "query_common.hpp" #include "storage/v2/isolation_level.hpp" #include "storage/v2/property_value.hpp" -#include "utils/csv_parsing.hpp" #include "utils/logging.hpp" namespace {