Implement mg_import_csv

Summary:
This diff restores (and fixes) the old mg_import_csv implementation. The
importer now supports the new storage engine.

Reviewers: teon.banek, ipaljak

Reviewed By: teon.banek, ipaljak

Subscribers: buda, pullbot

Differential Revision: https://phabricator.memgraph.io/D2690
This commit is contained in:
Matej Ferencevic 2020-03-03 10:38:50 +01:00
parent ec0caad44c
commit a456d6cdc0
28 changed files with 951 additions and 10 deletions

View File

@ -80,10 +80,6 @@ add_custom_command(TARGET memgraph POST_BUILD
BYPRODUCTS ${CMAKE_BINARY_DIR}/config/memgraph.conf
COMMENT "Generating memgraph configuration file")
# ----------------------------------------------------------------------------
# END Memgraph Single Node v2 Executable
# ----------------------------------------------------------------------------
# Everything here is under "memgraph" install component.
set(CMAKE_INSTALL_DEFAULT_COMPONENT_NAME "memgraph")
@ -112,3 +108,27 @@ endif()
# Create empty directories for default location of lib and log.
install(CODE "file(MAKE_DIRECTORY \$ENV{DESTDIR}/var/log/memgraph
\$ENV{DESTDIR}/var/lib/memgraph)")
# ----------------------------------------------------------------------------
# END Memgraph Single Node v2 Executable
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Memgraph CSV Import Tool Executable
# ----------------------------------------------------------------------------
add_executable(mg_import_csv mg_import_csv.cpp)
target_link_libraries(mg_import_csv mg-storage-v2)
# Strip the executable in release build.
if (lower_build_type STREQUAL "release")
add_custom_command(TARGET mg_import_csv POST_BUILD
COMMAND strip -s mg_import_csv
COMMENT "Stripping symbols and sections from mg_import_csv")
endif()
install(TARGETS mg_import_csv RUNTIME DESTINATION bin)
# ----------------------------------------------------------------------------
# Memgraph CSV Import Tool Executable
# ----------------------------------------------------------------------------

View File

@ -1,5 +1,13 @@
#pragma once
#include <pwd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <filesystem>
#include <string>
#include <vector>
@ -13,7 +21,7 @@
/// 1) /etc/memgraph/memgraph.conf
/// 2) ~/.memgraph/config
/// 3) env - MEMGRAPH_CONFIG
void LoadConfig() {
inline void LoadConfig(const std::string &product_name) {
namespace fs = std::filesystem;
std::vector<fs::path> configs = {fs::path("/etc/memgraph/memgraph.conf")};
if (getenv("HOME") != nullptr)
@ -40,7 +48,7 @@ void LoadConfig() {
int custom_argc = static_cast<int>(flagfile_arguments.size()) + 1;
char **custom_argv = new char *[custom_argc];
custom_argv[0] = strdup(std::string("memgraph").c_str());
custom_argv[0] = strdup(product_name.c_str());
for (int i = 0; i < static_cast<int>(flagfile_arguments.size()); ++i) {
custom_argv[i + 1] = strdup(flagfile_arguments[i].c_str());
}
@ -53,3 +61,36 @@ void LoadConfig() {
for (int i = 0; i < custom_argc; ++i) free(custom_argv[i]);
delete[] custom_argv;
}
/// Verifies that the owner of the data directory is the same user that started
/// the current process.
inline void VerifyDataDirectoryOwnerAndProcessUser(
const std::string &data_directory) {
// Get the process user ID.
auto process_euid = geteuid();
// Get the data directory owner ID.
struct stat statbuf;
auto ret = stat(data_directory.c_str(), &statbuf);
if (ret != 0 && errno == ENOENT) {
// The directory doesn't currently exist.
return;
}
CHECK(ret == 0) << "Couldn't get stat for '" << data_directory
<< "' because of: " << strerror(errno) << " (" << errno
<< ")";
auto directory_owner = statbuf.st_uid;
auto get_username = [](auto uid) {
auto info = getpwuid(uid);
if (!info) return std::to_string(uid);
return std::string(info->pw_name);
};
auto user_process = get_username(process_euid);
auto user_directory = get_username(directory_owner);
CHECK(process_euid == directory_owner)
<< "The process is running as user " << user_process
<< ", but the data directory is owned by user " << user_directory
<< ". Please start the process as user " << user_directory << "!";
}

View File

@ -20,8 +20,8 @@
#include "communication/init.hpp"
#include "communication/server.hpp"
#include "communication/session.hpp"
#include "config.hpp"
#include "glue/communication.hpp"
#include "helpers.hpp"
#include "py/py.hpp"
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
@ -70,6 +70,8 @@ DEFINE_string(bolt_server_name_for_init, "",
"Bolt INIT message.");
// General purpose flags.
// NOTE: The `data_directory` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well.
DEFINE_string(data_directory, "mg_data",
"Path to directory in which to save all permanent data.");
DEFINE_string(log_file, "", "Path to where the log should be stored.");
@ -85,6 +87,8 @@ DEFINE_uint64(memory_warning_threshold, 1024,
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30,
"Storage garbage collector interval (in seconds).",
FLAG_IN_RANGE(1, 24 * 3600));
// NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well.
DEFINE_bool(storage_properties_on_edges, false,
"Controls whether edges have properties.");
DEFINE_bool(storage_recover_on_startup, false,
@ -811,7 +815,7 @@ int main(int argc, char **argv) {
// Load config before parsing arguments, so that flags from the command line
// overwrite the config.
LoadConfig();
LoadConfig("memgraph");
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
@ -821,6 +825,10 @@ int main(int argc, char **argv) {
// Unhandled exception handler init.
std::set_terminate(&utils::TerminateHandler);
// Verify that the user that started the Memgraph process is the same user
// that is the owner of the data directory.
VerifyDataDirectoryOwnerAndProcessUser(FLAGS_data_directory);
// Initialize Python
auto *program_name = Py_DecodeLocale(argv[0], nullptr);
CHECK(program_name);

507
src/mg_import_csv.cpp Normal file
View File

@ -0,0 +1,507 @@
#include <algorithm>
#include <cstdio>
#include <filesystem>
#include <fstream>
#include <optional>
#include <regex>
#include <unordered_map>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "helpers.hpp"
#include "storage/v2/storage.hpp"
#include "utils/exceptions.hpp"
#include "utils/string.hpp"
#include "utils/timer.hpp"
#include "version.hpp"
bool ValidateNotNewline(const char *flagname, const std::string &value) {
auto has_no_newline = value.find('\n') == std::string::npos;
if (!has_no_newline) {
printf("The argument '%s' cannot contain newline character\n", flagname);
}
return has_no_newline;
}
bool ValidateNoWhitespace(const char *flagname, const std::string &value) {
auto trimmed = utils::Trim(value);
if (trimmed.empty() && !value.empty()) {
printf("The argument '%s' cannot be only whitespace\n", flagname);
return false;
} else if (!trimmed.empty()) {
for (auto c : trimmed) {
if (std::isspace(c)) {
printf("The argument '%s' cannot contain whitespace\n", flagname);
return false;
}
}
}
return true;
}
// Memgraph flags.
// NOTE: These flags must be identical as the flags in the main Memgraph binary.
// They are used to automatically load the same configuration as the main
// Memgraph binary so that the flags don't need to be specified when importing a
// CSV file on a correctly set-up Memgraph installation.
DEFINE_string(data_directory, "mg_data",
"Path to directory in which to save all permanent data.");
DEFINE_bool(storage_properties_on_edges, false,
"Controls whether relationships have properties.");
// CSV import flags.
DEFINE_string(array_delimiter, ";",
"Delimiter between elements of array values.");
DEFINE_string(delimiter, ",", "Delimiter between each field in the CSV.");
DEFINE_string(quote, "\"",
"Quotation character for data in the CSV. Cannot contain '\n'");
DEFINE_validator(quote, &ValidateNotNewline);
DEFINE_bool(skip_duplicate_nodes, false,
"Set to true to skip duplicate nodes instead of raising an error.");
// Arguments `--nodes` and `--relationships` can be input multiple times and are
// handled with custom parsing.
DEFINE_string(nodes, "", "CSV file containing graph nodes (vertices).");
DEFINE_string(node_label, "",
"Specify additional label for nodes. To add multiple labels, "
"repeat the flag multiple times.");
DEFINE_validator(node_label, &ValidateNoWhitespace);
DEFINE_string(relationships, "",
"CSV file containing graph relationships (edges).");
DEFINE_string(relationship_type, "",
"Overwrite the relationship type from csv with the given value.");
DEFINE_validator(relationship_type, &ValidateNoWhitespace);
std::vector<std::string> ParseRepeatedFlag(const std::string &flagname,
int argc, char *argv[]) {
std::vector<std::string> values;
for (int i = 1; i < argc; ++i) {
std::string flag(argv[i]);
int matched_flag_dashes = 0;
if (utils::StartsWith(flag, "--" + flagname))
matched_flag_dashes = 2;
else if (utils::StartsWith(flag, "-" + flagname))
matched_flag_dashes = 1;
// Get the value if we matched the flag.
if (matched_flag_dashes != 0) {
std::string value;
auto maybe_value = flag.substr(flagname.size() + matched_flag_dashes);
if (maybe_value.empty() && i + 1 < argc)
value = argv[++i];
else if (!maybe_value.empty() && maybe_value.front() == '=')
value = maybe_value.substr(1);
CHECK(!value.empty()) << "The argument '" << flagname << "' is required";
values.push_back(value);
}
}
return values;
}
// A field describing the CSV column.
struct Field {
// Name of the field.
std::string name;
// Type of the values under this field.
std::string type;
};
// A node ID from CSV format.
struct NodeId {
std::string id;
// Group/space of IDs. ID must be unique in a single group.
std::string id_space;
};
bool operator==(const NodeId &a, const NodeId &b) {
return a.id == b.id && a.id_space == b.id_space;
}
std::ostream &operator<<(std::ostream &stream, const NodeId &node_id) {
return stream << node_id.id << "(" << node_id.id_space << ")";
}
namespace std {
template <>
struct hash<NodeId> {
size_t operator()(const NodeId &node_id) const {
size_t id_hash = std::hash<std::string>{}(node_id.id);
size_t id_space_hash = std::hash<std::string>{}(node_id.id_space);
return id_hash ^ (id_space_hash << 1UL);
}
};
} // namespace std
// Exception used to indicate that something went wrong during data loading.
class LoadException : public utils::BasicException {
public:
using utils::BasicException::BasicException;
};
/// @throw LoadException
std::pair<std::vector<std::string>, uint64_t> ReadRow(std::istream &stream) {
std::vector<std::string> row;
bool quoting = false;
std::vector<char> column;
std::string line;
uint64_t lines_count = 0;
auto check_quote = [&line](int curr_idx) {
return curr_idx + FLAGS_quote.size() <= line.size() &&
line.compare(curr_idx, FLAGS_quote.size(), FLAGS_quote) == 0;
};
do {
if (!std::getline(stream, line)) {
if (quoting) {
throw LoadException(
"There is no more data left to load while inside a quoted string. "
"Did you forget to close the quote?");
} else {
// The whole row was processed.
break;
}
}
++lines_count;
for (size_t i = 0; i < line.size(); ++i) {
auto c = line[i];
if (quoting) {
if (check_quote(i)) {
quoting = false;
i += FLAGS_quote.size() - 1;
} else {
column.push_back(c);
}
} else if (check_quote(i)) {
// Hopefully, escaping isn't needed
quoting = true;
i += FLAGS_quote.size() - 1;
} else if (c == FLAGS_delimiter.front()) {
row.emplace_back(column.begin(), column.end());
column.clear();
// Handle special case when delimiter is the last
// character in line. This means that another
// empty column needs to be added.
if (i == line.size() - 1) {
row.emplace_back("");
}
} else {
column.push_back(c);
}
}
} while (quoting);
if (!column.empty()) row.emplace_back(column.begin(), column.end());
return {std::move(row), lines_count};
}
/// @throw LoadException
std::pair<std::vector<Field>, uint64_t> ReadHeader(std::istream &stream) {
auto [row, lines_count] = ReadRow(stream);
std::vector<Field> fields;
fields.reserve(row.size());
for (const auto &value : row) {
auto name_and_type = utils::Split(value, ":");
if (name_and_type.size() != 1U && name_and_type.size() != 2U)
throw LoadException(
"Expected a name and optionally a type, got '{}'. Did you specify a "
"correct CSV delimiter?",
value);
auto name = name_and_type[0];
// When type is missing, default is string.
std::string type("string");
if (name_and_type.size() == 2U) type = utils::Trim(name_and_type[1]);
fields.push_back(Field{name, type});
}
return {std::move(fields), lines_count};
}
/// @throw LoadException
storage::PropertyValue StringToValue(const std::string &str,
const std::string &type) {
// Empty string signifies Null.
if (str.empty()) return storage::PropertyValue();
auto convert = [](const auto &str, const auto &type) {
if (type == "int" || type == "long" || type == "byte" || type == "short") {
std::istringstream ss(str);
int64_t val;
ss >> val;
return storage::PropertyValue(val);
} else if (type == "float" || type == "double") {
return storage::PropertyValue(utils::ParseDouble(str));
} else if (type == "boolean") {
if (utils::ToLowerCase(str) == "true") {
return storage::PropertyValue(true);
} else {
return storage::PropertyValue(false);
}
} else if (type == "char" || type == "string") {
return storage::PropertyValue(str);
}
throw LoadException("Unexpected type: {}", type);
};
// Type *not* ending with '[]', signifies regular value.
if (!utils::EndsWith(type, "[]")) return convert(str, type);
// Otherwise, we have an array type.
auto elem_type = type.substr(0, type.size() - 2);
auto elems = utils::Split(str, FLAGS_array_delimiter);
std::vector<storage::PropertyValue> array;
array.reserve(elems.size());
for (const auto &elem : elems) {
array.push_back(convert(std::string(utils::Trim(elem)), elem_type));
}
return storage::PropertyValue(std::move(array));
}
/// @throw LoadException
std::string GetIdSpace(const std::string &type) {
// The format of this field is as follows:
// [START_|END_]ID[(<id_space>)]
std::regex format(R"(^(START_|END_)?ID(\(([^\(\)]+)\))?$)",
std::regex::extended);
std::smatch res;
if (!std::regex_match(type, res, format))
throw LoadException(
"Expected the ID field to look like '[START_|END_]ID[(<id_space>)]', "
"but got '{}' instead",
type);
CHECK(res.size() == 4) << "Invalid regex match result!";
return res[3];
}
/// @throw LoadException
void ProcessNodeRow(storage::Storage *store, const std::vector<Field> &fields,
const std::vector<std::string> &row,
const std::vector<std::string> &additional_labels,
std::unordered_map<NodeId, storage::Gid> *node_id_map) {
std::optional<NodeId> id;
auto acc = store->Access();
auto node = acc.CreateVertex();
for (size_t i = 0; i < row.size(); ++i) {
const auto &field = fields[i];
std::string value(utils::Trim(row[i]));
if (utils::StartsWith(field.type, "ID")) {
if (id) throw LoadException("Only one node ID must be specified");
NodeId node_id{value, GetIdSpace(field.type)};
auto it = node_id_map->find(node_id);
if (it != node_id_map->end()) {
if (FLAGS_skip_duplicate_nodes) {
LOG(WARNING) << "Skipping duplicate node with id '" << node_id << "'";
return;
} else {
throw LoadException("Node with id '{}' already exists", node_id);
}
}
node_id_map->emplace(node_id, node.Gid());
auto node_property = node.SetProperty(acc.NameToProperty("id"),
storage::PropertyValue(node_id.id));
if (!node_property.HasValue())
throw LoadException("Couldn't add property 'id' to the node");
if (!*node_property)
throw LoadException("The property 'id' already exists");
id = node_id;
} else if (field.type == "LABEL") {
for (const auto &label : utils::Split(value, FLAGS_array_delimiter)) {
auto node_label = node.AddLabel(acc.NameToLabel(utils::Trim(label)));
if (!node_label.HasValue())
throw LoadException("Couldn't add label '{}' to the node",
utils::Trim(label));
if (!*node_label)
throw LoadException("The label '{}' already exists",
utils::Trim(label));
}
} else if (field.type != "IGNORE") {
auto node_property = node.SetProperty(acc.NameToProperty(field.name),
StringToValue(value, field.type));
if (!node_property.HasValue())
throw LoadException("Couldn't add property '{}' to the node",
field.name);
if (!*node_property)
throw LoadException("The property '{}' already exists", field.name);
}
}
for (const auto &label : additional_labels) {
auto node_label = node.AddLabel(acc.NameToLabel(utils::Trim(label)));
if (!node_label.HasValue())
throw LoadException("Couldn't add label '{}' to the node",
utils::Trim(label));
if (!*node_label)
throw LoadException("The label '{}' already exists", utils::Trim(label));
}
if (!id) throw LoadException("Node ID must be specified");
if (acc.Commit().HasError()) throw LoadException("Couldn't store the node");
}
void ProcessNodes(storage::Storage *store, const std::string &nodes_path,
std::unordered_map<NodeId, storage::Gid> *node_id_map,
const std::vector<std::string> &additional_labels) {
std::ifstream nodes_file(nodes_path);
CHECK(nodes_file) << "Unable to open '" << nodes_path << "'";
uint64_t row_number = 1;
try {
auto [fields, header_lines] = ReadHeader(nodes_file);
row_number += header_lines;
while (true) {
auto [row, lines_count] = ReadRow(nodes_file);
if (lines_count == 0) break;
if (row.size() != fields.size())
throw LoadException(
"Expected as many values as there are header fields (found {}, "
"expected {})",
row.size(), fields.size());
ProcessNodeRow(store, fields, row, additional_labels, node_id_map);
row_number += lines_count;
}
} catch (const LoadException &e) {
LOG(FATAL) << "Couldn't process row " << row_number << " of '" << nodes_path
<< "' because of: " << e.what();
}
}
/// @throw LoadException
void ProcessRelationshipsRow(
storage::Storage *store, const std::vector<Field> &fields,
const std::vector<std::string> &row,
const std::unordered_map<NodeId, storage::Gid> &node_id_map) {
std::optional<storage::Gid> start_id;
std::optional<storage::Gid> end_id;
std::optional<std::string> relationship_type;
std::map<std::string, storage::PropertyValue> properties;
for (size_t i = 0; i < row.size(); ++i) {
const auto &field = fields[i];
std::string value(utils::Trim(row[i]));
if (utils::StartsWith(field.type, "START_ID")) {
if (start_id) throw LoadException("Only one node ID must be specified");
NodeId node_id{value, GetIdSpace(field.type)};
auto it = node_id_map.find(node_id);
if (it == node_id_map.end())
throw LoadException("Node with id '{}' does not exist", node_id);
start_id = it->second;
} else if (utils::StartsWith(field.type, "END_ID")) {
if (end_id) throw LoadException("Only one node ID must be specified");
NodeId node_id{value, GetIdSpace(field.type)};
auto it = node_id_map.find(node_id);
if (it == node_id_map.end())
throw LoadException("Node with id '{}' does not exist", node_id);
end_id = it->second;
} else if (field.type == "TYPE") {
if (relationship_type)
throw LoadException("Only one relationship TYPE must be specified");
relationship_type = value;
} else if (field.type != "IGNORE") {
properties[field.name] = StringToValue(value, field.type);
}
}
auto rel_type = utils::Trim(FLAGS_relationship_type);
if (!rel_type.empty()) {
relationship_type = rel_type;
}
if (!start_id) throw LoadException("START_ID must be set");
if (!end_id) throw LoadException("END_ID must be set");
if (!relationship_type) throw LoadException("Relationship TYPE must be set");
auto acc = store->Access();
auto from_node = acc.FindVertex(*start_id, storage::View::NEW);
if (!from_node) throw LoadException("From node must be in the storage");
auto to_node = acc.FindVertex(*end_id, storage::View::NEW);
if (!to_node) throw LoadException("To node must be in the storage");
auto relationship = acc.CreateEdge(&*from_node, &*to_node,
acc.NameToEdgeType(*relationship_type));
if (!relationship.HasValue())
throw LoadException("Couldn't create the relationship");
if (acc.Commit().HasError())
throw LoadException("Couldn't store the relationship");
}
void ProcessRelationships(
storage::Storage *store, const std::string &relationships_path,
const std::unordered_map<NodeId, storage::Gid> &node_id_map) {
std::ifstream relationships_file(relationships_path);
CHECK(relationships_file) << "Unable to open '" << relationships_path << "'";
uint64_t row_number = 1;
try {
auto [fields, header_lines] = ReadHeader(relationships_file);
row_number += header_lines;
while (true) {
auto [row, lines_count] = ReadRow(relationships_file);
if (lines_count == 0) break;
if (row.size() != fields.size())
throw LoadException(
"Expected as many values as there are header fields (found {}, "
"expected {})",
row.size(), fields.size());
ProcessRelationshipsRow(store, fields, row, node_id_map);
row_number += lines_count;
}
} catch (const LoadException &e) {
LOG(FATAL) << "Couldn't process row " << row_number << " of '"
<< relationships_path << "' because of: " << e.what();
}
}
static const char *usage =
"[OPTION]... [--out=SNAPSHOT_FILE] [--nodes=CSV_FILE]... "
"[--relationships=CSV_FILE]...\n"
"Create a Memgraph recovery snapshot file from CSV.\n";
int main(int argc, char *argv[]) {
gflags::SetUsageMessage(usage);
gflags::SetVersionString(version_string);
auto nodes = ParseRepeatedFlag("nodes", argc, argv);
auto additional_labels = ParseRepeatedFlag("node-label", argc, argv);
auto relationships = ParseRepeatedFlag("relationships", argc, argv);
// Load config before parsing arguments, so that flags from the command line
// overwrite the config.
LoadConfig("mg_import_csv");
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
CHECK(!nodes.empty()) << "The --nodes flag is required!";
// Verify that the user that started the Memgraph process is the same user
// that is the owner of the data directory.
VerifyDataDirectoryOwnerAndProcessUser(FLAGS_data_directory);
{
auto all_inputs = nodes;
all_inputs.insert(all_inputs.end(), relationships.begin(),
relationships.end());
LOG(INFO) << "Loading " << utils::Join(all_inputs, ", ");
}
std::unordered_map<NodeId, storage::Gid> node_id_map;
storage::Storage store{
{.durability =
{.storage_directory = FLAGS_data_directory,
.recover_on_startup = false,
.snapshot_wal_mode =
storage::Config::Durability::SnapshotWalMode::DISABLED,
.snapshot_on_exit = true},
.items = {
.properties_on_edges = FLAGS_storage_properties_on_edges,
}}};
utils::Timer load_timer;
// Process all nodes files.
for (const auto &nodes_file : nodes) {
ProcessNodes(&store, nodes_file, &node_id_map, additional_labels);
}
// Process all relationships files.
for (const auto &relationships_file : relationships) {
ProcessRelationships(&store, relationships_file, node_id_map);
}
double load_sec = load_timer.Elapsed().count();
LOG(INFO) << "Loaded all data in " << fmt::format("{:.3f}", load_sec) << " s";
// The snapshot will be created in the storage destructor.
return 0;
}

View File

@ -33,3 +33,6 @@ add_subdirectory(audit)
# ldap test binaries
add_subdirectory(ldap)
# mg_import_csv test binaries
add_subdirectory(mg_import_csv)

View File

@ -55,6 +55,16 @@
- ../../../build_debug/tests/integration/ldap/tester # tester binary
enable_network: true
- name: integration__mg_import_csv
cd: mg_import_csv
commands: ./runner.py
infiles:
- runner.py # runner script
- tests # tests directory
- ../../../build_debug/memgraph # memgraph binary
- ../../../build_debug/src/mg_import_csv # mg_import_csv binary
- ../../../build_debug/tests/integration/mg_import_csv/tester # tester binary
#- name: integration__ha_basic
# cd: ha/basic
# commands: TIMEOUT=480 ./runner.py

View File

@ -0,0 +1,6 @@
set(target_name memgraph__integration__mg_import_csv)
set(tester_target_name ${target_name}__tester)
add_executable(${tester_target_name} tester.cpp)
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
target_link_libraries(${tester_target_name} mg-communication json)

View File

@ -0,0 +1,158 @@
#!/usr/bin/python3 -u
import argparse
import atexit
import os
import subprocess
import sys
import tempfile
import time
import yaml
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def get_build_dir():
if os.path.exists(os.path.join(BASE_DIR, "build_release")):
return os.path.join(BASE_DIR, "build_release")
if os.path.exists(os.path.join(BASE_DIR, "build_debug")):
return os.path.join(BASE_DIR, "build_debug")
if os.path.exists(os.path.join(BASE_DIR, "build_community")):
return os.path.join(BASE_DIR, "build_community")
return os.path.join(BASE_DIR, "build")
def extract_rows(data):
return list(map(lambda x: x.strip(), data.strip().split("\n")))
def list_to_string(data):
ret = "[\n"
for row in data:
ret += " " + row + "\n"
ret += "]"
return ret
def execute_test(name, test_path, test_config, memgraph_binary,
mg_import_csv_binary, tester_binary):
print("\033[1;36m~~ Executing test", name, "~~\033[0m")
storage_directory = tempfile.TemporaryDirectory()
# Verify test configuration
if ("import_should_fail" not in test_config and
"expected" not in test_config) or \
("import_should_fail" in test_config and
"expected" in test_config):
raise Exception("The test should specify either 'import_should_fail' "
"or 'expected'!")
# Load test expected queries
import_should_fail = test_config.pop("import_should_fail", False)
expected_path = test_config.pop("expected", "")
if expected_path:
with open(os.path.join(test_path, expected_path)) as f:
queries_expected = extract_rows(f.read())
else:
queries_expected = ""
# Generate common args
properties_on_edges = bool(test_config.pop("properties_on_edges", False))
common_args = ["--data-directory", storage_directory.name,
"--storage-properties-on-edges=" +
str(properties_on_edges).lower()]
# Generate mg_import_csv args using flags specified in the test
mg_import_csv_args = [mg_import_csv_binary] + common_args
for key, value in test_config.items():
flag = "--" + key.replace("_", "-")
if type(value) == list:
for item in value:
mg_import_csv_args.extend([flag, str(item)])
elif type(value) == bool:
mg_import_csv_args.append(flag + "=" + str(value).lower())
else:
mg_import_csv_args.extend([flag, str(value)])
# Execute mg_import_csv
ret = subprocess.run(mg_import_csv_args, cwd=test_path)
# Check the return code
if import_should_fail:
if ret.returncode == 0:
raise Exception("The import should have failed, but it "
"succeeded instead!")
else:
print("\033[1;32m~~ Test successful ~~\033[0m\n")
return
else:
if ret.returncode != 0:
raise Exception("The import should have succeeded, but it "
"failed instead!")
# Start the memgraph binary
memgraph_args = [memgraph_binary, "--storage-recover-on-startup"] + \
common_args
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
# Register cleanup function
@atexit.register
def cleanup():
if memgraph.poll() is None:
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
# Get the contents of the database
queries_got = extract_rows(subprocess.run(
[tester_binary], stdout=subprocess.PIPE,
check=True).stdout.decode("utf-8"))
# Shutdown the memgraph binary
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
# Verify the queries
queries_expected.sort()
queries_got.sort()
assert queries_got == queries_expected, "Expected\n{}\nto be equal to\n" \
"{}".format(list_to_string(queries_got),
list_to_string(queries_expected))
print("\033[1;32m~~ Test successful ~~\033[0m\n")
if __name__ == "__main__":
memgraph_binary = os.path.join(get_build_dir(), "memgraph")
mg_import_csv_binary = os.path.join(
get_build_dir(), "src", "mg_import_csv")
tester_binary = os.path.join(
get_build_dir(), "tests", "integration", "mg_import_csv", "tester")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--mg-import-csv", default=mg_import_csv_binary)
parser.add_argument("--tester", default=tester_binary)
args = parser.parse_args()
test_dir = os.path.join(SCRIPT_DIR, "tests")
for name in sorted(os.listdir(test_dir)):
print("\033[1;34m~~ Processing tests from", name, "~~\033[0m\n")
test_path = os.path.join(test_dir, name)
with open(os.path.join(test_path, "test.yaml")) as f:
testcases = yaml.safe_load(f)
for test_config in testcases:
test_name = name + "/" + test_config.pop("name")
execute_test(test_name, test_path, test_config, args.memgraph,
args.mg_import_csv, args.tester)
sys.exit(0)

View File

@ -0,0 +1,40 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
/**
* Executes "DUMP DATABASE" and outputs all results to stdout. On any errors it
* exits with a non-zero exit code.
*/
// NOLINTNEXTLINE(bugprone-exception-escape)
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
communication::Init();
io::network::Endpoint endpoint(io::network::ResolveHostname(FLAGS_address),
FLAGS_port);
communication::ClientContext context(FLAGS_use_ssl);
communication::bolt::Client client(&context);
client.Connect(endpoint, FLAGS_username, FLAGS_password);
auto ret = client.Execute("DUMP DATABASE", {});
for (const auto &row : ret.records) {
CHECK(row.size() == 1) << "Too much entries in query dump row (got "
<< row.size() << ", expected 1)!";
std::cout << row[0].ValueString() << std::endl;
}
return 0;
}

View File

@ -0,0 +1,7 @@
:ID,value
0,foo
1,bar
baz
1 :ID,value
2 0,foo
3 1,bar
4 baz

View File

@ -0,0 +1,3 @@
- name: empty_lines
nodes: "nodes.csv"
import_should_fail: True

View File

@ -0,0 +1,2 @@
:id,value
0,foo
1 :id value
2 0 foo

View File

@ -0,0 +1,3 @@
- name: magic_fields_lower_case
nodes: "nodes.csv"
import_should_fail: True

View File

@ -0,0 +1,2 @@
id:ID,value
0,"value
Can't render this file because it contains an unexpected character in line 2 and column 10.

View File

@ -0,0 +1,3 @@
- name: quote_not_closed
nodes: "nodes.csv"
import_should_fail: True

View File

@ -0,0 +1,18 @@
CREATE INDEX ON :__mg_vertex__(__mg_id__);
CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 0, content: "yes", browser: "Chrome", id: "0", country: "Croatia"});
CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 1, content: "thanks", browser: "Chrome", id: "1", country: "United Kingdom"});
CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 2, content: "LOL", id: "2", country: "Germany"});
CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 3, content: "I see", browser: "Firefox", id: "3", country: "France"});
CREATE (:__mg_vertex__:Message:Comment:First:Second {__mg_id__: 4, content: "fine", browser: "Internet Explorer", id: "4", country: "Italy"});
CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 5, title: "General", id: "0"});
CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 6, title: "Support", id: "1"});
CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 7, title: "Music", id: "2"});
CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 8, title: "Film", id: "3"});
CREATE (:__mg_vertex__:Forum:First:Second {__mg_id__: 9, title: "Programming", id: "4"});
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 0 AND v.__mg_id__ = 5 CREATE (u)-[:POSTED_ON]->(v);
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 1 AND v.__mg_id__ = 6 CREATE (u)-[:POSTED_ON]->(v);
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 2 AND v.__mg_id__ = 7 CREATE (u)-[:POSTED_ON]->(v);
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 3 AND v.__mg_id__ = 8 CREATE (u)-[:POSTED_ON]->(v);
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 4 AND v.__mg_id__ = 9 CREATE (u)-[:POSTED_ON]->(v);
DROP INDEX ON :__mg_vertex__(__mg_id__);
MATCH (u) REMOVE u:__mg_vertex__, u.__mg_id__;

View File

@ -0,0 +1,7 @@
id:ID(COMMENT_ID)|country|browser:string|content:string|:LABEL
0|Croatia|Chrome|yes|Message;Comment
1|ÖUnited
KingdomÖ|Chrome|thanks|Message;Comment
2|Germany||LOL|Message;Comment
3|France|Firefox|I see|Message;Comment
4|Italy|Internet Explorer|fine|Message;Comment
1 id:ID(COMMENT_ID)|country|browser:string|content:string|:LABEL
2 0|Croatia|Chrome|yes|Message;Comment
3 1|ÖUnited
4 KingdomÖ|Chrome|thanks|Message;Comment
5 2|Germany||LOL|Message;Comment
6 3|France|Firefox|I see|Message;Comment
7 4|Italy|Internet Explorer|fine|Message;Comment

View File

@ -0,0 +1,6 @@
id:ID(FORUM_ID)|title:string|:LABEL|emptyColumn
0|General|Forum|
1|Support|Forum|
2|Music|Forum|
3|Film|Forum|
4|Programming|Forum|
1 id:ID(FORUM_ID) title:string :LABEL emptyColumn
2 0 General Forum
3 1 Support Forum
4 2 Music Forum
5 3 Film Forum
6 4 Programming Forum

View File

@ -0,0 +1,4 @@
:START_ID(COMMENT_ID)|:END_ID(FORUM_ID)|:TYPE
0|0|POSTED_ON
1|1|POSTED_ON
2|2|POSTED_ON
1 :START_ID(COMMENT_ID) :END_ID(FORUM_ID) :TYPE
2 0 0 POSTED_ON
3 1 1 POSTED_ON
4 2 2 POSTED_ON

View File

@ -0,0 +1,3 @@
:START_ID(COMMENT_ID)|:END_ID(FORUM_ID)|:TYPE
3|3|POSTED_ON
4|4|POSTED_ON
1 :START_ID(COMMENT_ID) :END_ID(FORUM_ID) :TYPE
2 3 3 POSTED_ON
3 4 4 POSTED_ON

View File

@ -0,0 +1,44 @@
- name: good_configuration
nodes:
- nodes_comment.csv
- nodes_forum.csv
node_label:
- First
- Second
relationships:
- relationships_0.csv
- relationships_1.csv
delimiter: "|"
quote: "Ö"
array_delimiter: ";"
expected: expected.cypher
- name: wrong_delimiter
nodes:
- nodes_comment.csv
- nodes_forum.csv
node_label:
- First
- Second
relationships:
- relationships_0.csv
- relationships_1.csv
delimiter: ","
quote: "Ö"
array_delimiter: ";"
import_should_fail: True
- name: wrong_quote
nodes:
- nodes_comment.csv
- nodes_forum.csv
node_label:
- First
- Second
relationships:
- relationships_0.csv
- relationships_1.csv
delimiter: "|"
quote: "\""
array_delimiter: ";"
import_should_fail: True

View File

@ -0,0 +1,11 @@
CREATE INDEX ON :__mg_vertex__(__mg_id__);
CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 0, id: "0", country: "Croatia", browser: "Chrome", content: "yes"});
CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 1, id: "1", country: "United Kingdom", browser: "Chrome", content: "thanks"});
CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 2, id: "2", country: "Germany", content: "LOL"});
CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 3, id: "3", country: "France", browser: "Firefox", content: "I see"});
CREATE (:__mg_vertex__:Message:Comment {__mg_id__: 4, id: "4", country: "Italy", browser: "Internet Explorer", content: "fine"});
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 0 AND v.__mg_id__ = 1 CREATE (u)-[:TYPE]->(v);
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 0 AND v.__mg_id__ = 2 CREATE (u)-[:TYPE]->(v);
MATCH (u:__mg_vertex__), (v:__mg_vertex__) WHERE u.__mg_id__ = 1 AND v.__mg_id__ = 2 CREATE (u)-[:TYPE]->(v);
DROP INDEX ON :__mg_vertex__(__mg_id__);
MATCH (u) REMOVE u:__mg_vertex__, u.__mg_id__;

View File

@ -0,0 +1,6 @@
id:ID(COMMENT_ID)|country:string|browser:string|content:string|:LABEL
0|Croatia|Chrome|yes|Message;Comment
1|"United Kingdom"|Chrome|thanks|Message;Comment
2|Germany||LOL|Message;Comment
3|France|Firefox|I see|Message;Comment
4|Italy|Internet Explorer|fine|Message;Comment
1 id:ID(COMMENT_ID) country:string browser:string content:string :LABEL
2 0 Croatia Chrome yes Message;Comment
3 1 United Kingdom Chrome thanks Message;Comment
4 2 Germany LOL Message;Comment
5 3 France Firefox I see Message;Comment
6 4 Italy Internet Explorer fine Message;Comment

View File

@ -0,0 +1,4 @@
:START_ID(COMMENT_ID)|:END_ID(COMMENT_ID)|:TYPE
0|1|LIKES
1|2|FOLLOWS
0|2|KNOWS
1 :START_ID(COMMENT_ID) :END_ID(COMMENT_ID) :TYPE
2 0 1 LIKES
3 1 2 FOLLOWS
4 0 2 KNOWS

View File

@ -0,0 +1,21 @@
- name: good_configuration
nodes:
- nodes_comment.csv
relationships:
- relationships.csv
relationship_type: "TYPE "
delimiter: "|"
quote: "\""
array_delimiter: ";"
expected: expected.cypher
- name: wrong_delimiter
nodes:
- nodes_comment.csv
relationships:
- relationships.csv
relationship_type: "TYPE "
delimiter: ","
quote: "\""
array_delimiter: ";"
import_should_fail: True

View File

@ -0,0 +1,2 @@
:ID,value
0,foo
1 :ID value
2 0 foo

View File

@ -0,0 +1,4 @@
- name: wrong_delimiter
nodes: "nodes.csv"
delimiter: "-"
import_should_fail: True

2
tools/.gitignore vendored
View File

@ -1,2 +0,0 @@
build/
mg_import_csv