diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 71954a6bd..ffe266217 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -80,10 +80,6 @@ add_custom_command(TARGET memgraph POST_BUILD BYPRODUCTS ${CMAKE_BINARY_DIR}/config/memgraph.conf COMMENT "Generating memgraph configuration file") -# ---------------------------------------------------------------------------- -# END Memgraph Single Node v2 Executable -# ---------------------------------------------------------------------------- - # Everything here is under "memgraph" install component. set(CMAKE_INSTALL_DEFAULT_COMPONENT_NAME "memgraph") @@ -112,3 +108,27 @@ endif() # Create empty directories for default location of lib and log. install(CODE "file(MAKE_DIRECTORY \$ENV{DESTDIR}/var/log/memgraph \$ENV{DESTDIR}/var/lib/memgraph)") + +# ---------------------------------------------------------------------------- +# END Memgraph Single Node v2 Executable +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Memgraph CSV Import Tool Executable +# ---------------------------------------------------------------------------- + +add_executable(mg_import_csv mg_import_csv.cpp) +target_link_libraries(mg_import_csv mg-storage-v2) + +# Strip the executable in release build. +if (lower_build_type STREQUAL "release") + add_custom_command(TARGET mg_import_csv POST_BUILD + COMMAND strip -s mg_import_csv + COMMENT "Stripping symbols and sections from mg_import_csv") +endif() + +install(TARGETS mg_import_csv RUNTIME DESTINATION bin) + +# ---------------------------------------------------------------------------- +# Memgraph CSV Import Tool Executable +# ---------------------------------------------------------------------------- diff --git a/src/config.hpp b/src/helpers.hpp similarity index 54% rename from src/config.hpp rename to src/helpers.hpp index 7669a8398..20359dcc2 100644 --- a/src/config.hpp +++ b/src/helpers.hpp @@ -1,5 +1,13 @@ #pragma once +#include +#include +#include +#include + +#include +#include + #include #include #include @@ -13,7 +21,7 @@ /// 1) /etc/memgraph/memgraph.conf /// 2) ~/.memgraph/config /// 3) env - MEMGRAPH_CONFIG -void LoadConfig() { +inline void LoadConfig(const std::string &product_name) { namespace fs = std::filesystem; std::vector configs = {fs::path("/etc/memgraph/memgraph.conf")}; if (getenv("HOME") != nullptr) @@ -40,7 +48,7 @@ void LoadConfig() { int custom_argc = static_cast(flagfile_arguments.size()) + 1; char **custom_argv = new char *[custom_argc]; - custom_argv[0] = strdup(std::string("memgraph").c_str()); + custom_argv[0] = strdup(product_name.c_str()); for (int i = 0; i < static_cast(flagfile_arguments.size()); ++i) { custom_argv[i + 1] = strdup(flagfile_arguments[i].c_str()); } @@ -53,3 +61,36 @@ void LoadConfig() { for (int i = 0; i < custom_argc; ++i) free(custom_argv[i]); delete[] custom_argv; } + +/// Verifies that the owner of the data directory is the same user that started +/// the current process. +inline void VerifyDataDirectoryOwnerAndProcessUser( + const std::string &data_directory) { + // Get the process user ID. + auto process_euid = geteuid(); + + // Get the data directory owner ID. + struct stat statbuf; + auto ret = stat(data_directory.c_str(), &statbuf); + if (ret != 0 && errno == ENOENT) { + // The directory doesn't currently exist. + return; + } + CHECK(ret == 0) << "Couldn't get stat for '" << data_directory + << "' because of: " << strerror(errno) << " (" << errno + << ")"; + auto directory_owner = statbuf.st_uid; + + auto get_username = [](auto uid) { + auto info = getpwuid(uid); + if (!info) return std::to_string(uid); + return std::string(info->pw_name); + }; + + auto user_process = get_username(process_euid); + auto user_directory = get_username(directory_owner); + CHECK(process_euid == directory_owner) + << "The process is running as user " << user_process + << ", but the data directory is owned by user " << user_directory + << ". Please start the process as user " << user_directory << "!"; +} diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 3db9d1049..12822888d 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -20,8 +20,8 @@ #include "communication/init.hpp" #include "communication/server.hpp" #include "communication/session.hpp" -#include "config.hpp" #include "glue/communication.hpp" +#include "helpers.hpp" #include "py/py.hpp" #include "query/exceptions.hpp" #include "query/interpreter.hpp" @@ -70,6 +70,8 @@ DEFINE_string(bolt_server_name_for_init, "", "Bolt INIT message."); // General purpose flags. +// NOTE: The `data_directory` flag must be the same here and in +// `mg_import_csv`. If you change it, make sure to change it there as well. DEFINE_string(data_directory, "mg_data", "Path to directory in which to save all permanent data."); DEFINE_string(log_file, "", "Path to where the log should be stored."); @@ -85,6 +87,8 @@ DEFINE_uint64(memory_warning_threshold, 1024, DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", FLAG_IN_RANGE(1, 24 * 3600)); +// NOTE: The `storage_properties_on_edges` flag must be the same here and in +// `mg_import_csv`. If you change it, make sure to change it there as well. DEFINE_bool(storage_properties_on_edges, false, "Controls whether edges have properties."); DEFINE_bool(storage_recover_on_startup, false, @@ -811,7 +815,7 @@ int main(int argc, char **argv) { // Load config before parsing arguments, so that flags from the command line // overwrite the config. - LoadConfig(); + LoadConfig("memgraph"); gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); @@ -821,6 +825,10 @@ int main(int argc, char **argv) { // Unhandled exception handler init. std::set_terminate(&utils::TerminateHandler); + // Verify that the user that started the Memgraph process is the same user + // that is the owner of the data directory. + VerifyDataDirectoryOwnerAndProcessUser(FLAGS_data_directory); + // Initialize Python auto *program_name = Py_DecodeLocale(argv[0], nullptr); CHECK(program_name); diff --git a/src/mg_import_csv.cpp b/src/mg_import_csv.cpp new file mode 100644 index 000000000..b07a988e8 --- /dev/null +++ b/src/mg_import_csv.cpp @@ -0,0 +1,507 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "helpers.hpp" +#include "storage/v2/storage.hpp" +#include "utils/exceptions.hpp" +#include "utils/string.hpp" +#include "utils/timer.hpp" +#include "version.hpp" + +bool ValidateNotNewline(const char *flagname, const std::string &value) { + auto has_no_newline = value.find('\n') == std::string::npos; + if (!has_no_newline) { + printf("The argument '%s' cannot contain newline character\n", flagname); + } + return has_no_newline; +} + +bool ValidateNoWhitespace(const char *flagname, const std::string &value) { + auto trimmed = utils::Trim(value); + if (trimmed.empty() && !value.empty()) { + printf("The argument '%s' cannot be only whitespace\n", flagname); + return false; + } else if (!trimmed.empty()) { + for (auto c : trimmed) { + if (std::isspace(c)) { + printf("The argument '%s' cannot contain whitespace\n", flagname); + return false; + } + } + } + return true; +} + +// Memgraph flags. +// NOTE: These flags must be identical as the flags in the main Memgraph binary. +// They are used to automatically load the same configuration as the main +// Memgraph binary so that the flags don't need to be specified when importing a +// CSV file on a correctly set-up Memgraph installation. +DEFINE_string(data_directory, "mg_data", + "Path to directory in which to save all permanent data."); +DEFINE_bool(storage_properties_on_edges, false, + "Controls whether relationships have properties."); + +// CSV import flags. +DEFINE_string(array_delimiter, ";", + "Delimiter between elements of array values."); +DEFINE_string(delimiter, ",", "Delimiter between each field in the CSV."); +DEFINE_string(quote, "\"", + "Quotation character for data in the CSV. Cannot contain '\n'"); +DEFINE_validator(quote, &ValidateNotNewline); +DEFINE_bool(skip_duplicate_nodes, false, + "Set to true to skip duplicate nodes instead of raising an error."); +// Arguments `--nodes` and `--relationships` can be input multiple times and are +// handled with custom parsing. +DEFINE_string(nodes, "", "CSV file containing graph nodes (vertices)."); +DEFINE_string(node_label, "", + "Specify additional label for nodes. To add multiple labels, " + "repeat the flag multiple times."); +DEFINE_validator(node_label, &ValidateNoWhitespace); +DEFINE_string(relationships, "", + "CSV file containing graph relationships (edges)."); +DEFINE_string(relationship_type, "", + "Overwrite the relationship type from csv with the given value."); +DEFINE_validator(relationship_type, &ValidateNoWhitespace); + +std::vector ParseRepeatedFlag(const std::string &flagname, + int argc, char *argv[]) { + std::vector values; + for (int i = 1; i < argc; ++i) { + std::string flag(argv[i]); + int matched_flag_dashes = 0; + if (utils::StartsWith(flag, "--" + flagname)) + matched_flag_dashes = 2; + else if (utils::StartsWith(flag, "-" + flagname)) + matched_flag_dashes = 1; + // Get the value if we matched the flag. + if (matched_flag_dashes != 0) { + std::string value; + auto maybe_value = flag.substr(flagname.size() + matched_flag_dashes); + if (maybe_value.empty() && i + 1 < argc) + value = argv[++i]; + else if (!maybe_value.empty() && maybe_value.front() == '=') + value = maybe_value.substr(1); + CHECK(!value.empty()) << "The argument '" << flagname << "' is required"; + values.push_back(value); + } + } + return values; +} + +// A field describing the CSV column. +struct Field { + // Name of the field. + std::string name; + // Type of the values under this field. + std::string type; +}; + +// A node ID from CSV format. +struct NodeId { + std::string id; + // Group/space of IDs. ID must be unique in a single group. + std::string id_space; +}; + +bool operator==(const NodeId &a, const NodeId &b) { + return a.id == b.id && a.id_space == b.id_space; +} + +std::ostream &operator<<(std::ostream &stream, const NodeId &node_id) { + return stream << node_id.id << "(" << node_id.id_space << ")"; +} + +namespace std { + +template <> +struct hash { + size_t operator()(const NodeId &node_id) const { + size_t id_hash = std::hash{}(node_id.id); + size_t id_space_hash = std::hash{}(node_id.id_space); + return id_hash ^ (id_space_hash << 1UL); + } +}; + +} // namespace std + +// Exception used to indicate that something went wrong during data loading. +class LoadException : public utils::BasicException { + public: + using utils::BasicException::BasicException; +}; + +/// @throw LoadException +std::pair, uint64_t> ReadRow(std::istream &stream) { + std::vector row; + bool quoting = false; + std::vector column; + std::string line; + uint64_t lines_count = 0; + + auto check_quote = [&line](int curr_idx) { + return curr_idx + FLAGS_quote.size() <= line.size() && + line.compare(curr_idx, FLAGS_quote.size(), FLAGS_quote) == 0; + }; + + do { + if (!std::getline(stream, line)) { + if (quoting) { + throw LoadException( + "There is no more data left to load while inside a quoted string. " + "Did you forget to close the quote?"); + } else { + // The whole row was processed. + break; + } + } + ++lines_count; + for (size_t i = 0; i < line.size(); ++i) { + auto c = line[i]; + if (quoting) { + if (check_quote(i)) { + quoting = false; + i += FLAGS_quote.size() - 1; + } else { + column.push_back(c); + } + } else if (check_quote(i)) { + // Hopefully, escaping isn't needed + quoting = true; + i += FLAGS_quote.size() - 1; + } else if (c == FLAGS_delimiter.front()) { + row.emplace_back(column.begin(), column.end()); + column.clear(); + // Handle special case when delimiter is the last + // character in line. This means that another + // empty column needs to be added. + if (i == line.size() - 1) { + row.emplace_back(""); + } + } else { + column.push_back(c); + } + } + } while (quoting); + + if (!column.empty()) row.emplace_back(column.begin(), column.end()); + return {std::move(row), lines_count}; +} + +/// @throw LoadException +std::pair, uint64_t> ReadHeader(std::istream &stream) { + auto [row, lines_count] = ReadRow(stream); + std::vector fields; + fields.reserve(row.size()); + for (const auto &value : row) { + auto name_and_type = utils::Split(value, ":"); + if (name_and_type.size() != 1U && name_and_type.size() != 2U) + throw LoadException( + "Expected a name and optionally a type, got '{}'. Did you specify a " + "correct CSV delimiter?", + value); + auto name = name_and_type[0]; + // When type is missing, default is string. + std::string type("string"); + if (name_and_type.size() == 2U) type = utils::Trim(name_and_type[1]); + fields.push_back(Field{name, type}); + } + return {std::move(fields), lines_count}; +} + +/// @throw LoadException +storage::PropertyValue StringToValue(const std::string &str, + const std::string &type) { + // Empty string signifies Null. + if (str.empty()) return storage::PropertyValue(); + auto convert = [](const auto &str, const auto &type) { + if (type == "int" || type == "long" || type == "byte" || type == "short") { + std::istringstream ss(str); + int64_t val; + ss >> val; + return storage::PropertyValue(val); + } else if (type == "float" || type == "double") { + return storage::PropertyValue(utils::ParseDouble(str)); + } else if (type == "boolean") { + if (utils::ToLowerCase(str) == "true") { + return storage::PropertyValue(true); + } else { + return storage::PropertyValue(false); + } + } else if (type == "char" || type == "string") { + return storage::PropertyValue(str); + } + throw LoadException("Unexpected type: {}", type); + }; + // Type *not* ending with '[]', signifies regular value. + if (!utils::EndsWith(type, "[]")) return convert(str, type); + // Otherwise, we have an array type. + auto elem_type = type.substr(0, type.size() - 2); + auto elems = utils::Split(str, FLAGS_array_delimiter); + std::vector array; + array.reserve(elems.size()); + for (const auto &elem : elems) { + array.push_back(convert(std::string(utils::Trim(elem)), elem_type)); + } + return storage::PropertyValue(std::move(array)); +} + +/// @throw LoadException +std::string GetIdSpace(const std::string &type) { + // The format of this field is as follows: + // [START_|END_]ID[()] + std::regex format(R"(^(START_|END_)?ID(\(([^\(\)]+)\))?$)", + std::regex::extended); + std::smatch res; + if (!std::regex_match(type, res, format)) + throw LoadException( + "Expected the ID field to look like '[START_|END_]ID[()]', " + "but got '{}' instead", + type); + CHECK(res.size() == 4) << "Invalid regex match result!"; + return res[3]; +} + +/// @throw LoadException +void ProcessNodeRow(storage::Storage *store, const std::vector &fields, + const std::vector &row, + const std::vector &additional_labels, + std::unordered_map *node_id_map) { + std::optional id; + auto acc = store->Access(); + auto node = acc.CreateVertex(); + for (size_t i = 0; i < row.size(); ++i) { + const auto &field = fields[i]; + std::string value(utils::Trim(row[i])); + if (utils::StartsWith(field.type, "ID")) { + if (id) throw LoadException("Only one node ID must be specified"); + NodeId node_id{value, GetIdSpace(field.type)}; + auto it = node_id_map->find(node_id); + if (it != node_id_map->end()) { + if (FLAGS_skip_duplicate_nodes) { + LOG(WARNING) << "Skipping duplicate node with id '" << node_id << "'"; + return; + } else { + throw LoadException("Node with id '{}' already exists", node_id); + } + } + node_id_map->emplace(node_id, node.Gid()); + auto node_property = node.SetProperty(acc.NameToProperty("id"), + storage::PropertyValue(node_id.id)); + if (!node_property.HasValue()) + throw LoadException("Couldn't add property 'id' to the node"); + if (!*node_property) + throw LoadException("The property 'id' already exists"); + id = node_id; + } else if (field.type == "LABEL") { + for (const auto &label : utils::Split(value, FLAGS_array_delimiter)) { + auto node_label = node.AddLabel(acc.NameToLabel(utils::Trim(label))); + if (!node_label.HasValue()) + throw LoadException("Couldn't add label '{}' to the node", + utils::Trim(label)); + if (!*node_label) + throw LoadException("The label '{}' already exists", + utils::Trim(label)); + } + } else if (field.type != "IGNORE") { + auto node_property = node.SetProperty(acc.NameToProperty(field.name), + StringToValue(value, field.type)); + if (!node_property.HasValue()) + throw LoadException("Couldn't add property '{}' to the node", + field.name); + if (!*node_property) + throw LoadException("The property '{}' already exists", field.name); + } + } + for (const auto &label : additional_labels) { + auto node_label = node.AddLabel(acc.NameToLabel(utils::Trim(label))); + if (!node_label.HasValue()) + throw LoadException("Couldn't add label '{}' to the node", + utils::Trim(label)); + if (!*node_label) + throw LoadException("The label '{}' already exists", utils::Trim(label)); + } + if (!id) throw LoadException("Node ID must be specified"); + if (acc.Commit().HasError()) throw LoadException("Couldn't store the node"); +} + +void ProcessNodes(storage::Storage *store, const std::string &nodes_path, + std::unordered_map *node_id_map, + const std::vector &additional_labels) { + std::ifstream nodes_file(nodes_path); + CHECK(nodes_file) << "Unable to open '" << nodes_path << "'"; + uint64_t row_number = 1; + try { + auto [fields, header_lines] = ReadHeader(nodes_file); + row_number += header_lines; + while (true) { + auto [row, lines_count] = ReadRow(nodes_file); + if (lines_count == 0) break; + if (row.size() != fields.size()) + throw LoadException( + "Expected as many values as there are header fields (found {}, " + "expected {})", + row.size(), fields.size()); + ProcessNodeRow(store, fields, row, additional_labels, node_id_map); + row_number += lines_count; + } + } catch (const LoadException &e) { + LOG(FATAL) << "Couldn't process row " << row_number << " of '" << nodes_path + << "' because of: " << e.what(); + } +} + +/// @throw LoadException +void ProcessRelationshipsRow( + storage::Storage *store, const std::vector &fields, + const std::vector &row, + const std::unordered_map &node_id_map) { + std::optional start_id; + std::optional end_id; + std::optional relationship_type; + std::map properties; + for (size_t i = 0; i < row.size(); ++i) { + const auto &field = fields[i]; + std::string value(utils::Trim(row[i])); + if (utils::StartsWith(field.type, "START_ID")) { + if (start_id) throw LoadException("Only one node ID must be specified"); + NodeId node_id{value, GetIdSpace(field.type)}; + auto it = node_id_map.find(node_id); + if (it == node_id_map.end()) + throw LoadException("Node with id '{}' does not exist", node_id); + start_id = it->second; + } else if (utils::StartsWith(field.type, "END_ID")) { + if (end_id) throw LoadException("Only one node ID must be specified"); + NodeId node_id{value, GetIdSpace(field.type)}; + auto it = node_id_map.find(node_id); + if (it == node_id_map.end()) + throw LoadException("Node with id '{}' does not exist", node_id); + end_id = it->second; + } else if (field.type == "TYPE") { + if (relationship_type) + throw LoadException("Only one relationship TYPE must be specified"); + relationship_type = value; + } else if (field.type != "IGNORE") { + properties[field.name] = StringToValue(value, field.type); + } + } + auto rel_type = utils::Trim(FLAGS_relationship_type); + if (!rel_type.empty()) { + relationship_type = rel_type; + } + if (!start_id) throw LoadException("START_ID must be set"); + if (!end_id) throw LoadException("END_ID must be set"); + if (!relationship_type) throw LoadException("Relationship TYPE must be set"); + + auto acc = store->Access(); + auto from_node = acc.FindVertex(*start_id, storage::View::NEW); + if (!from_node) throw LoadException("From node must be in the storage"); + auto to_node = acc.FindVertex(*end_id, storage::View::NEW); + if (!to_node) throw LoadException("To node must be in the storage"); + + auto relationship = acc.CreateEdge(&*from_node, &*to_node, + acc.NameToEdgeType(*relationship_type)); + if (!relationship.HasValue()) + throw LoadException("Couldn't create the relationship"); + + if (acc.Commit().HasError()) + throw LoadException("Couldn't store the relationship"); +} + +void ProcessRelationships( + storage::Storage *store, const std::string &relationships_path, + const std::unordered_map &node_id_map) { + std::ifstream relationships_file(relationships_path); + CHECK(relationships_file) << "Unable to open '" << relationships_path << "'"; + uint64_t row_number = 1; + try { + auto [fields, header_lines] = ReadHeader(relationships_file); + row_number += header_lines; + while (true) { + auto [row, lines_count] = ReadRow(relationships_file); + if (lines_count == 0) break; + if (row.size() != fields.size()) + throw LoadException( + "Expected as many values as there are header fields (found {}, " + "expected {})", + row.size(), fields.size()); + ProcessRelationshipsRow(store, fields, row, node_id_map); + row_number += lines_count; + } + } catch (const LoadException &e) { + LOG(FATAL) << "Couldn't process row " << row_number << " of '" + << relationships_path << "' because of: " << e.what(); + } +} + +static const char *usage = + "[OPTION]... [--out=SNAPSHOT_FILE] [--nodes=CSV_FILE]... " + "[--relationships=CSV_FILE]...\n" + "Create a Memgraph recovery snapshot file from CSV.\n"; + +int main(int argc, char *argv[]) { + gflags::SetUsageMessage(usage); + gflags::SetVersionString(version_string); + + auto nodes = ParseRepeatedFlag("nodes", argc, argv); + auto additional_labels = ParseRepeatedFlag("node-label", argc, argv); + auto relationships = ParseRepeatedFlag("relationships", argc, argv); + + // Load config before parsing arguments, so that flags from the command line + // overwrite the config. + LoadConfig("mg_import_csv"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + CHECK(!nodes.empty()) << "The --nodes flag is required!"; + + // Verify that the user that started the Memgraph process is the same user + // that is the owner of the data directory. + VerifyDataDirectoryOwnerAndProcessUser(FLAGS_data_directory); + + { + auto all_inputs = nodes; + all_inputs.insert(all_inputs.end(), relationships.begin(), + relationships.end()); + LOG(INFO) << "Loading " << utils::Join(all_inputs, ", "); + } + + std::unordered_map node_id_map; + storage::Storage store{ + {.durability = + {.storage_directory = FLAGS_data_directory, + .recover_on_startup = false, + .snapshot_wal_mode = + storage::Config::Durability::SnapshotWalMode::DISABLED, + .snapshot_on_exit = true}, + .items = { + .properties_on_edges = FLAGS_storage_properties_on_edges, + }}}; + + utils::Timer load_timer; + + // Process all nodes files. + for (const auto &nodes_file : nodes) { + ProcessNodes(&store, nodes_file, &node_id_map, additional_labels); + } + + // Process all relationships files. + for (const auto &relationships_file : relationships) { + ProcessRelationships(&store, relationships_file, node_id_map); + } + + double load_sec = load_timer.Elapsed().count(); + LOG(INFO) << "Loaded all data in " << fmt::format("{:.3f}", load_sec) << " s"; + + // The snapshot will be created in the storage destructor. + + return 0; +} diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 217678d8f..3ef0f4e3d 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -33,3 +33,6 @@ add_subdirectory(audit) # ldap test binaries add_subdirectory(ldap) + +# mg_import_csv test binaries +add_subdirectory(mg_import_csv) diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml index ef44059f0..56911d084 100644 --- a/tests/integration/apollo_runs.yaml +++ b/tests/integration/apollo_runs.yaml @@ -55,6 +55,16 @@ - ../../../build_debug/tests/integration/ldap/tester # tester binary enable_network: true +- name: integration__mg_import_csv + cd: mg_import_csv + commands: ./runner.py + infiles: + - runner.py # runner script + - tests # tests directory + - ../../../build_debug/memgraph # memgraph binary + - ../../../build_debug/src/mg_import_csv # mg_import_csv binary + - ../../../build_debug/tests/integration/mg_import_csv/tester # tester binary + #- name: integration__ha_basic # cd: ha/basic # commands: TIMEOUT=480 ./runner.py diff --git a/tests/integration/mg_import_csv/CMakeLists.txt b/tests/integration/mg_import_csv/CMakeLists.txt new file mode 100644 index 000000000..4b3540e8c --- /dev/null +++ b/tests/integration/mg_import_csv/CMakeLists.txt @@ -0,0 +1,6 @@ +set(target_name memgraph__integration__mg_import_csv) +set(tester_target_name ${target_name}__tester) + +add_executable(${tester_target_name} tester.cpp) +set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) +target_link_libraries(${tester_target_name} mg-communication json) diff --git a/tests/integration/mg_import_csv/runner.py b/tests/integration/mg_import_csv/runner.py new file mode 100755 index 000000000..09376b7d2 --- /dev/null +++ b/tests/integration/mg_import_csv/runner.py @@ -0,0 +1,158 @@ +#!/usr/bin/python3 -u +import argparse +import atexit +import os +import subprocess +import sys +import tempfile +import time +import yaml + + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) + + +def wait_for_server(port, delay=0.1): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] + while subprocess.call(cmd) != 0: + time.sleep(0.01) + time.sleep(delay) + + +def get_build_dir(): + if os.path.exists(os.path.join(BASE_DIR, "build_release")): + return os.path.join(BASE_DIR, "build_release") + if os.path.exists(os.path.join(BASE_DIR, "build_debug")): + return os.path.join(BASE_DIR, "build_debug") + if os.path.exists(os.path.join(BASE_DIR, "build_community")): + return os.path.join(BASE_DIR, "build_community") + return os.path.join(BASE_DIR, "build") + + +def extract_rows(data): + return list(map(lambda x: x.strip(), data.strip().split("\n"))) + + +def list_to_string(data): + ret = "[\n" + for row in data: + ret += " " + row + "\n" + ret += "]" + return ret + + +def execute_test(name, test_path, test_config, memgraph_binary, + mg_import_csv_binary, tester_binary): + print("\033[1;36m~~ Executing test", name, "~~\033[0m") + storage_directory = tempfile.TemporaryDirectory() + + # Verify test configuration + if ("import_should_fail" not in test_config and + "expected" not in test_config) or \ + ("import_should_fail" in test_config and + "expected" in test_config): + raise Exception("The test should specify either 'import_should_fail' " + "or 'expected'!") + + # Load test expected queries + import_should_fail = test_config.pop("import_should_fail", False) + expected_path = test_config.pop("expected", "") + if expected_path: + with open(os.path.join(test_path, expected_path)) as f: + queries_expected = extract_rows(f.read()) + else: + queries_expected = "" + + # Generate common args + properties_on_edges = bool(test_config.pop("properties_on_edges", False)) + common_args = ["--data-directory", storage_directory.name, + "--storage-properties-on-edges=" + + str(properties_on_edges).lower()] + + # Generate mg_import_csv args using flags specified in the test + mg_import_csv_args = [mg_import_csv_binary] + common_args + for key, value in test_config.items(): + flag = "--" + key.replace("_", "-") + if type(value) == list: + for item in value: + mg_import_csv_args.extend([flag, str(item)]) + elif type(value) == bool: + mg_import_csv_args.append(flag + "=" + str(value).lower()) + else: + mg_import_csv_args.extend([flag, str(value)]) + + # Execute mg_import_csv + ret = subprocess.run(mg_import_csv_args, cwd=test_path) + + # Check the return code + if import_should_fail: + if ret.returncode == 0: + raise Exception("The import should have failed, but it " + "succeeded instead!") + else: + print("\033[1;32m~~ Test successful ~~\033[0m\n") + return + else: + if ret.returncode != 0: + raise Exception("The import should have succeeded, but it " + "failed instead!") + + # Start the memgraph binary + memgraph_args = [memgraph_binary, "--storage-recover-on-startup"] + \ + common_args + memgraph = subprocess.Popen(list(map(str, memgraph_args))) + time.sleep(0.1) + assert memgraph.poll() is None, "Memgraph process died prematurely!" + wait_for_server(7687) + + # Register cleanup function + @atexit.register + def cleanup(): + if memgraph.poll() is None: + memgraph.terminate() + assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!" + + # Get the contents of the database + queries_got = extract_rows(subprocess.run( + [tester_binary], stdout=subprocess.PIPE, + check=True).stdout.decode("utf-8")) + + # Shutdown the memgraph binary + memgraph.terminate() + assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!" + + # Verify the queries + queries_expected.sort() + queries_got.sort() + assert queries_got == queries_expected, "Expected\n{}\nto be equal to\n" \ + "{}".format(list_to_string(queries_got), + list_to_string(queries_expected)) + print("\033[1;32m~~ Test successful ~~\033[0m\n") + + +if __name__ == "__main__": + memgraph_binary = os.path.join(get_build_dir(), "memgraph") + mg_import_csv_binary = os.path.join( + get_build_dir(), "src", "mg_import_csv") + tester_binary = os.path.join( + get_build_dir(), "tests", "integration", "mg_import_csv", "tester") + + parser = argparse.ArgumentParser() + parser.add_argument("--memgraph", default=memgraph_binary) + parser.add_argument("--mg-import-csv", default=mg_import_csv_binary) + parser.add_argument("--tester", default=tester_binary) + args = parser.parse_args() + + test_dir = os.path.join(SCRIPT_DIR, "tests") + for name in sorted(os.listdir(test_dir)): + print("\033[1;34m~~ Processing tests from", name, "~~\033[0m\n") + test_path = os.path.join(test_dir, name) + with open(os.path.join(test_path, "test.yaml")) as f: + testcases = yaml.safe_load(f) + for test_config in testcases: + test_name = name + "/" + test_config.pop("name") + execute_test(test_name, test_path, test_config, args.memgraph, + args.mg_import_csv, args.tester) + + sys.exit(0) diff --git a/tests/integration/mg_import_csv/tester.cpp b/tests/integration/mg_import_csv/tester.cpp new file mode 100644 index 000000000..ab5b6de76 --- /dev/null +++ b/tests/integration/mg_import_csv/tester.cpp @@ -0,0 +1,40 @@ +#include +#include + +#include "communication/bolt/client.hpp" +#include "io/network/endpoint.hpp" +#include "io/network/utils.hpp" + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_int32(port, 7687, "Server port"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); + +/** + * Executes "DUMP DATABASE" and outputs all results to stdout. On any errors it + * exits with a non-zero exit code. + */ +// NOLINTNEXTLINE(bugprone-exception-escape) +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + communication::Init(); + + io::network::Endpoint endpoint(io::network::ResolveHostname(FLAGS_address), + FLAGS_port); + + communication::ClientContext context(FLAGS_use_ssl); + communication::bolt::Client client(&context); + + client.Connect(endpoint, FLAGS_username, FLAGS_password); + auto ret = client.Execute("DUMP DATABASE", {}); + for (const auto &row : ret.records) { + CHECK(row.size() == 1) << "Too much entries in query dump row (got " + << row.size() << ", expected 1)!"; + std::cout << row[0].ValueString() << std::endl; + } + + return 0; +} diff --git a/tests/integration/mg_import_csv/tests/empty_lines/nodes.csv b/tests/integration/mg_import_csv/tests/empty_lines/nodes.csv new file mode 100644 index 000000000..1a1350bc6 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/empty_lines/nodes.csv @@ -0,0 +1,7 @@ +:ID,value +0,foo +1,bar + + + +baz diff --git a/tests/integration/mg_import_csv/tests/empty_lines/test.yaml b/tests/integration/mg_import_csv/tests/empty_lines/test.yaml new file mode 100644 index 000000000..82e7346bf --- /dev/null +++ b/tests/integration/mg_import_csv/tests/empty_lines/test.yaml @@ -0,0 +1,3 @@ +- name: empty_lines + nodes: "nodes.csv" + import_should_fail: True diff --git a/tests/integration/mg_import_csv/tests/header_wrong_case/nodes.csv b/tests/integration/mg_import_csv/tests/header_wrong_case/nodes.csv new file mode 100644 index 000000000..981488fdf --- /dev/null +++ b/tests/integration/mg_import_csv/tests/header_wrong_case/nodes.csv @@ -0,0 +1,2 @@ +:id,value +0,foo diff --git a/tests/integration/mg_import_csv/tests/header_wrong_case/test.yaml b/tests/integration/mg_import_csv/tests/header_wrong_case/test.yaml new file mode 100644 index 000000000..8096d9cce --- /dev/null +++ b/tests/integration/mg_import_csv/tests/header_wrong_case/test.yaml @@ -0,0 +1,3 @@ +- name: magic_fields_lower_case + nodes: "nodes.csv" + import_should_fail: True diff --git a/tests/integration/mg_import_csv/tests/missing_end_quote/nodes.csv b/tests/integration/mg_import_csv/tests/missing_end_quote/nodes.csv new file mode 100644 index 000000000..9dccc7094 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/missing_end_quote/nodes.csv @@ -0,0 +1,2 @@ +id:ID,value +0,"value diff --git a/tests/integration/mg_import_csv/tests/missing_end_quote/test.yaml b/tests/integration/mg_import_csv/tests/missing_end_quote/test.yaml new file mode 100644 index 000000000..db133a167 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/missing_end_quote/test.yaml @@ -0,0 +1,3 @@ +- name: quote_not_closed + nodes: "nodes.csv" + import_should_fail: True diff --git a/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/expected.cypher b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/expected.cypher new file mode 100644 index 000000000..efa3ef14e --- /dev/null +++ b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/expected.cypher @@ -0,0 +1,18 @@ +CREATE INDEX ON :__mg_vertex__(__mg_id__); +CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 0, content: "yes", browser: "Chrome", id: "0", country: "Croatia"}); +CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 1, content: "thanks", browser: "Chrome", id: "1", country: "United Kingdom"}); +CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 2, content: "LOL", id: "2", country: "Germany"}); +CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 3, content: "I see", browser: "Firefox", id: "3", country: "France"}); +CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 4, content: "fine", browser: "Internet Explorer", id: "4", country: "Italy"}); +CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 5, title: "General", id: "0"}); +CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 6, title: "Support", id: "1"}); +CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 7, title: "Music", id: "2"}); +CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 8, title: "Film", id: "3"}); +CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 9, title: "Programming", id: "4"}); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 0 AND v.__mg_id__ = 5 CREATE (u)-[:POSTED_ON]->(v); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 1 AND v.__mg_id__ = 6 CREATE (u)-[:POSTED_ON]->(v); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 2 AND v.__mg_id__ = 7 CREATE (u)-[:POSTED_ON]->(v); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 3 AND v.__mg_id__ = 8 CREATE (u)-[:POSTED_ON]->(v); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 4 AND v.__mg_id__ = 9 CREATE (u)-[:POSTED_ON]->(v); +DROP INDEX ON :__mg_vertex__(__mg_id__); +MATCH (u) REMOVE u:__mg_vertex__, u.__mg_id__; diff --git a/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/nodes_comment.csv b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/nodes_comment.csv new file mode 100644 index 000000000..fe244948b --- /dev/null +++ b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/nodes_comment.csv @@ -0,0 +1,7 @@ +id:ID(COMMENT_ID)|country|browser:string|content:string|:LABEL +0|Croatia|Chrome|yes|Message;Comment +1|ÖUnited + KingdomÖ|Chrome|thanks|Message;Comment +2|Germany||LOL|Message;Comment +3|France|Firefox|I see|Message;Comment +4|Italy|Internet Explorer|fine|Message;Comment diff --git a/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/nodes_forum.csv b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/nodes_forum.csv new file mode 100644 index 000000000..cc91c0fd4 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/nodes_forum.csv @@ -0,0 +1,6 @@ +id:ID(FORUM_ID)|title:string|:LABEL|emptyColumn +0|General|Forum| +1|Support|Forum| +2|Music|Forum| +3|Film|Forum| +4|Programming|Forum| diff --git a/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/relationships_0.csv b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/relationships_0.csv new file mode 100644 index 000000000..63c7f90ec --- /dev/null +++ b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/relationships_0.csv @@ -0,0 +1,4 @@ +:START_ID(COMMENT_ID)|:END_ID(FORUM_ID)|:TYPE +0|0|POSTED_ON +1|1|POSTED_ON +2|2|POSTED_ON diff --git a/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/relationships_1.csv b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/relationships_1.csv new file mode 100644 index 000000000..2a3b828d6 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/relationships_1.csv @@ -0,0 +1,3 @@ +:START_ID(COMMENT_ID)|:END_ID(FORUM_ID)|:TYPE +3|3|POSTED_ON +4|4|POSTED_ON diff --git a/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/test.yaml b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/test.yaml new file mode 100644 index 000000000..c70691c5c --- /dev/null +++ b/tests/integration/mg_import_csv/tests/multi_char_quote_and_node_label/test.yaml @@ -0,0 +1,44 @@ +- name: good_configuration + nodes: + - nodes_comment.csv + - nodes_forum.csv + node_label: + - First + - Second + relationships: + - relationships_0.csv + - relationships_1.csv + delimiter: "|" + quote: "Ö" + array_delimiter: ";" + expected: expected.cypher + +- name: wrong_delimiter + nodes: + - nodes_comment.csv + - nodes_forum.csv + node_label: + - First + - Second + relationships: + - relationships_0.csv + - relationships_1.csv + delimiter: "," + quote: "Ö" + array_delimiter: ";" + import_should_fail: True + +- name: wrong_quote + nodes: + - nodes_comment.csv + - nodes_forum.csv + node_label: + - First + - Second + relationships: + - relationships_0.csv + - relationships_1.csv + delimiter: "|" + quote: "\"" + array_delimiter: ";" + import_should_fail: True diff --git a/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/expected.cypher b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/expected.cypher new file mode 100644 index 000000000..3774d8eb1 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/expected.cypher @@ -0,0 +1,11 @@ +CREATE INDEX ON :__mg_vertex__(__mg_id__); +CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 0, id: "0", country: "Croatia", browser: "Chrome", content: "yes"}); +CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 1, id: "1", country: "United Kingdom", browser: "Chrome", content: "thanks"}); +CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 2, id: "2", country: "Germany", content: "LOL"}); +CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 3, id: "3", country: "France", browser: "Firefox", content: "I see"}); +CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 4, id: "4", country: "Italy", browser: "Internet Explorer", content: "fine"}); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 0 AND v.__mg_id__ = 1 CREATE (u)-[:TYPE]->(v); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 0 AND v.__mg_id__ = 2 CREATE (u)-[:TYPE]->(v); +MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 1 AND v.__mg_id__ = 2 CREATE (u)-[:TYPE]->(v); +DROP INDEX ON :__mg_vertex__(__mg_id__); +MATCH (u) REMOVE u:__mg_vertex__, u.__mg_id__; diff --git a/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/nodes_comment.csv b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/nodes_comment.csv new file mode 100644 index 000000000..d0e4c2356 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/nodes_comment.csv @@ -0,0 +1,6 @@ +id:ID(COMMENT_ID)|country:string|browser:string|content:string|:LABEL +0|Croatia|Chrome|yes|Message;Comment +1|"United Kingdom"|Chrome|thanks|Message;Comment +2|Germany||LOL|Message;Comment +3|France|Firefox|I see|Message;Comment +4|Italy|Internet Explorer|fine|Message;Comment diff --git a/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/relationships.csv b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/relationships.csv new file mode 100644 index 000000000..c3f6717b1 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/relationships.csv @@ -0,0 +1,4 @@ +:START_ID(COMMENT_ID)|:END_ID(COMMENT_ID)|:TYPE +0|1|LIKES +1|2|FOLLOWS +0|2|KNOWS diff --git a/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/test.yaml b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/test.yaml new file mode 100644 index 000000000..9b17e9018 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/single_char_quote_and_relationship_type/test.yaml @@ -0,0 +1,21 @@ +- name: good_configuration + nodes: + - nodes_comment.csv + relationships: + - relationships.csv + relationship_type: "TYPE " + delimiter: "|" + quote: "\"" + array_delimiter: ";" + expected: expected.cypher + +- name: wrong_delimiter + nodes: + - nodes_comment.csv + relationships: + - relationships.csv + relationship_type: "TYPE " + delimiter: "," + quote: "\"" + array_delimiter: ";" + import_should_fail: True diff --git a/tests/integration/mg_import_csv/tests/wrong_delimiter/nodes.csv b/tests/integration/mg_import_csv/tests/wrong_delimiter/nodes.csv new file mode 100644 index 000000000..e7b7f25bd --- /dev/null +++ b/tests/integration/mg_import_csv/tests/wrong_delimiter/nodes.csv @@ -0,0 +1,2 @@ +:ID,value +0,foo diff --git a/tests/integration/mg_import_csv/tests/wrong_delimiter/test.yaml b/tests/integration/mg_import_csv/tests/wrong_delimiter/test.yaml new file mode 100644 index 000000000..e5de28e75 --- /dev/null +++ b/tests/integration/mg_import_csv/tests/wrong_delimiter/test.yaml @@ -0,0 +1,4 @@ +- name: wrong_delimiter + nodes: "nodes.csv" + delimiter: "-" + import_should_fail: True diff --git a/tools/.gitignore b/tools/.gitignore deleted file mode 100644 index 698a40213..000000000 --- a/tools/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -build/ -mg_import_csv