Matej Ferencevic f48ad62647 Implement storage lock
The storage now uses a file in the data directory (`.lock`) to determine
whether there is another instance of the storage running with the same data
directory. That helps notify the user/administrator that the system is running
in an unsupported configuration.

Reviewers: teon.banek, ipaljak

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision:
2020-03-13 19:27:01 +01:00

781 lines
28 KiB

#include <algorithm>
#include <cstdio>
#include <filesystem>
#include <fstream>
#include <optional>
#include <regex>
#include <unordered_map>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "helpers.hpp"
#include "storage/v2/storage.hpp"
#include "utils/exceptions.hpp"
#include "utils/string.hpp"
#include "utils/timer.hpp"
#include "version.hpp"
bool ValidateControlCharacter(const char *flagname, const std::string &value) {
if (value.empty()) {
printf("The argument '%s' cannot be empty\n", flagname);
return false;
if (value.find('\n') != std::string::npos) {
printf("The argument '%s' cannot contain a newline character\n", flagname);
return false;
return true;
bool ValidateIdTypeOptions(const char *flagname, const std::string &value) {
std::string upper = utils::ToUpperCase(utils::Trim(value));
if (upper != "STRING" && upper != "INTEGER") {
printf("Valid options for '%s' are: STRING/INTEGER\n", flagname);
return false;
return true;
// Memgraph flags.
// NOTE: These flags must be identical as the flags in the main Memgraph binary.
// They are used to automatically load the same configuration as the main
// Memgraph binary so that the flags don't need to be specified when importing a
// CSV file on a correctly set-up Memgraph installation.
DEFINE_string(data_directory, "mg_data",
"Path to directory in which to save all permanent data.");
DEFINE_bool(storage_properties_on_edges, false,
"Controls whether relationships have properties.");
// CSV import flags.
DEFINE_string(array_delimiter, ";",
"Delimiter between elements of array values.");
DEFINE_validator(array_delimiter, &ValidateControlCharacter);
DEFINE_string(delimiter, ",", "Delimiter between each field in the CSV.");
DEFINE_validator(delimiter, &ValidateControlCharacter);
DEFINE_string(quote, "\"",
"Quotation character for data in the CSV. Cannot contain '\n'");
DEFINE_validator(quote, &ValidateControlCharacter);
DEFINE_bool(skip_duplicate_nodes, false,
"Set to true to skip duplicate nodes instead of raising an error.");
DEFINE_bool(skip_bad_relationships, false,
"Set to true to skip relationships that connect nodes that don't "
"exist instead of raising an error.");
DEFINE_bool(ignore_empty_strings, false,
"Set to true to treat empty strings as null values.");
ignore_extra_columns, false,
"Set to true to ignore columns that aren't specified in the header.");
DEFINE_bool(trim_strings, false,
"Set to true to trim leading/trailing whitespace from all fields "
"that are loaded from the CSV file.");
DEFINE_string(id_type, "STRING",
"Which data type should be used to store the supplied node IDs. "
"Possible options are: STRING/INTEGER");
DEFINE_validator(id_type, &ValidateIdTypeOptions);
// Arguments `--nodes` and `--relationships` can be input multiple times and are
// handled with custom parsing.
nodes, "",
"Files that should be parsed for nodes. The CSV header will be loaded from "
"the first supplied file, all other files supplied in a single flag will "
"be treated as data files. Additional labels can be specified for the node "
"files. The flag can be specified multiple times (useful for differently "
"formatted node files). The format of this argument is: "
relationships, "",
"Files that should be parsed for relationships. The CSV header will be "
"loaded from the first supplied file, all other files supplied in a single "
"flag will be treated as data files. The relationship type can be "
"specified for the relationship files. The flag can be specified multiple "
"times (useful for differently formatted relationship files). The format "
"of this argument is: [<type>=]<file>[,<file>][,<file>]...");
std::vector<std::string> ParseRepeatedFlag(const std::string &flagname,
int argc, char *argv[]) {
std::vector<std::string> values;
for (int i = 1; i < argc; ++i) {
std::string flag(argv[i]);
int matched_flag_dashes = 0;
if (utils::StartsWith(flag, "--" + flagname))
matched_flag_dashes = 2;
else if (utils::StartsWith(flag, "-" + flagname))
matched_flag_dashes = 1;
// Get the value if we matched the flag.
if (matched_flag_dashes != 0) {
std::string value;
auto maybe_value = flag.substr(flagname.size() + matched_flag_dashes);
if (maybe_value.empty() && i + 1 < argc)
value = argv[++i];
else if (!maybe_value.empty() && maybe_value.front() == '=')
value = maybe_value.substr(1);
CHECK(!value.empty()) << "The argument '" << flagname << "' is required";
return values;
// A field describing the CSV column.
struct Field {
// Name of the field.
std::string name;
// Type of the values under this field.
std::string type;
// A node ID from CSV format.
struct NodeId {
std::string id;
// Group/space of IDs. ID must be unique in a single group.
std::string id_space;
bool operator==(const NodeId &a, const NodeId &b) {
return == && a.id_space == b.id_space;
std::ostream &operator<<(std::ostream &stream, const NodeId &node_id) {
if (!node_id.id_space.empty()) {
return stream << << "(" << node_id.id_space << ")";
} else {
return stream <<;
namespace std {
template <>
struct hash<NodeId> {
size_t operator()(const NodeId &node_id) const {
size_t id_hash = std::hash<std::string>{}(;
size_t id_space_hash = std::hash<std::string>{}(node_id.id_space);
return id_hash ^ (id_space_hash << 1UL);
} // namespace std
// Exception used to indicate that something went wrong during data loading.
class LoadException : public utils::BasicException {
using utils::BasicException::BasicException;
enum class CsvParserState {
bool SubstringStartsWith(const std::string_view &str, size_t pos,
const std::string_view &what) {
return utils::StartsWith(utils::Substr(str, pos), what);
/// This function reads a row from a CSV stream.
/// Each CSV field must be divided using the `delimiter` and each CSV field can
/// either be quoted or unquoted. When the field is quoted, the first and last
/// character in the field *must* be the quote character. If the field isn't
/// quoted, and a quote character appears in it, it is treated as a regular
/// character. If a quote character appears inside a quoted string then the
/// quote character must be doubled in order to escape it. Line feeds and
/// carriage returns are ignored in the CSV file, also, the file can't contain a
/// NULL character.
/// The function uses the same logic as the standard Python CSV parser. The data
/// is parsed in the same way as the following snippet:
/// ```
/// import csv
/// for row in csv.reader(stream, strict=True):
/// # process `row`
/// ```
/// Python uses 'excel' as the default dialect when parsing CSV files and the
/// default settings for the CSV parser are:
/// - delimiter: ','
/// - doublequote: True
/// - escapechar: None
/// - lineterminator: '\r\n'
/// - quotechar: '"'
/// - skipinitialspace: False
/// The above snippet can be expanded to:
/// ```
/// import csv
/// for row in csv.reader(stream, delimiter=',', doublequote=True,
/// escapechar=None, lineterminator='\r\n',
/// quotechar='"', skipinitialspace=False,
/// strict=True):
/// # process `row`
/// ```
/// For more information about the meaning of the above values, see:
/// @throw LoadException
std::pair<std::vector<std::string>, uint64_t> ReadRow(std::istream &stream) {
std::vector<std::string> row;
std::string column;
uint64_t lines_count = 0;
auto state = CsvParserState::INITIAL_FIELD;
do {
std::string line;
if (!std::getline(stream, line)) {
// The whole file was processed.
for (size_t i = 0; i < line.size(); ++i) {
auto c = line[i];
// Line feeds and carriage returns are ignored in CSVs.
if (c == '\n' || c == '\r') continue;
// Null bytes aren't allowed in CSVs.
if (c == '\0') throw LoadException("Line contains NULL byte");
switch (state) {
case CsvParserState::INITIAL_FIELD:
case CsvParserState::NEXT_FIELD: {
if (SubstringStartsWith(line, i, FLAGS_quote)) {
// The current field is a quoted field.
state = CsvParserState::QUOTING;
i += FLAGS_quote.size() - 1;
} else if (SubstringStartsWith(line, i, FLAGS_delimiter)) {
// The current field has an empty value.
state = CsvParserState::NEXT_FIELD;
i += FLAGS_delimiter.size() - 1;
} else {
// The current field is a regular field.
state = CsvParserState::NOT_QUOTING;
case CsvParserState::QUOTING: {
auto quote_now = SubstringStartsWith(line, i, FLAGS_quote);
auto quote_next =
SubstringStartsWith(line, i + FLAGS_quote.size(), FLAGS_quote);
if (quote_now && quote_next) {
// This is an escaped quote character.
column += FLAGS_quote;
i += FLAGS_quote.size() * 2 - 1;
} else if (quote_now && !quote_next) {
// This is the end of the quoted field.
state = CsvParserState::EXPECT_DELIMITER;
i += FLAGS_quote.size() - 1;
} else {
case CsvParserState::NOT_QUOTING: {
if (SubstringStartsWith(line, i, FLAGS_delimiter)) {
state = CsvParserState::NEXT_FIELD;
i += FLAGS_delimiter.size() - 1;
} else {
case CsvParserState::EXPECT_DELIMITER: {
if (SubstringStartsWith(line, i, FLAGS_delimiter)) {
state = CsvParserState::NEXT_FIELD;
i += FLAGS_delimiter.size() - 1;
} else {
throw LoadException("Expected '{}' after '{}', but got '{}'",
FLAGS_delimiter, FLAGS_quote, c);
} while (state == CsvParserState::QUOTING);
switch (state) {
case CsvParserState::INITIAL_FIELD: {
case CsvParserState::NEXT_FIELD: {
case CsvParserState::QUOTING: {
throw LoadException(
"There is no more data left to load while inside a quoted string. "
"Did you forget to close the quote?");
case CsvParserState::NOT_QUOTING: {
case CsvParserState::EXPECT_DELIMITER: {
if (FLAGS_trim_strings) {
for (size_t i = 0; i < row.size(); ++i) {
std::string trimmed(utils::Trim(row[i]));
row[i] = std::move(trimmed);
return {std::move(row), lines_count};
/// @throw LoadException
std::pair<std::vector<Field>, uint64_t> ReadHeader(std::istream &stream) {
auto [row, lines_count] = ReadRow(stream);
std::vector<Field> fields;
for (const auto &value : row) {
auto name_and_type = utils::Split(value, ":");
if (name_and_type.size() != 1U && name_and_type.size() != 2U)
throw LoadException(
"Expected a name and optionally a type, got '{}'. Did you specify a "
"correct CSV delimiter?",
auto name = name_and_type[0];
// When type is missing, default is string.
std::string type("string");
if (name_and_type.size() == 2U) type = utils::Trim(name_and_type[1]);
fields.push_back(Field{name, type});
return {std::move(fields), lines_count};
/// @throw LoadException
int64_t StringToInt(const std::string &value) {
try {
return utils::ParseInt(value);
} catch (...) {
throw LoadException("'{}' isn't a valid integer", value);
/// @throw LoadException
double StringToDouble(const std::string &value) {
try {
return utils::ParseDouble(value);
} catch (...) {
throw LoadException("'{}' isn't a valid floating-point value", value);
/// @throw LoadException
storage::PropertyValue StringToValue(const std::string &str,
const std::string &type) {
if (FLAGS_ignore_empty_strings && str.empty())
return storage::PropertyValue();
auto convert = [](const auto &str, const auto &type) {
if (type == "integer" || type == "int" || type == "long" ||
type == "byte" || type == "short") {
return storage::PropertyValue(StringToInt(str));
} else if (type == "float" || type == "double") {
return storage::PropertyValue(StringToDouble(str));
} else if (type == "boolean" || type == "bool") {
if (utils::ToLowerCase(str) == "true") {
return storage::PropertyValue(true);
} else {
return storage::PropertyValue(false);
} else if (type == "char" || type == "string") {
return storage::PropertyValue(str);
throw LoadException("Unexpected type: {}", type);
// Type *not* ending with '[]', signifies regular value.
if (!utils::EndsWith(type, "[]")) return convert(str, type);
// Otherwise, we have an array type.
auto elem_type = type.substr(0, type.size() - 2);
auto elems = utils::Split(str, FLAGS_array_delimiter);
std::vector<storage::PropertyValue> array;
for (const auto &elem : elems) {
array.push_back(convert(elem, elem_type));
return storage::PropertyValue(std::move(array));
/// @throw LoadException
std::string GetIdSpace(const std::string &type) {
// The format of this field is as follows:
// [START_|END_]ID[(<id_space>)]
std::regex format(R"(^(START_|END_)?ID(\(([^\(\)]+)\))?$)",
std::smatch res;
if (!std::regex_match(type, res, format))
throw LoadException(
"Expected the ID field to look like '[START_|END_]ID[(<id_space>)]', "
"but got '{}' instead",
CHECK(res.size() == 4) << "Invalid regex match result!";
return res[3];
/// @throw LoadException
void ProcessNodeRow(storage::Storage *store, const std::vector<Field> &fields,
const std::vector<std::string> &row,
const std::vector<std::string> &additional_labels,
std::unordered_map<NodeId, storage::Gid> *node_id_map) {
std::optional<NodeId> id;
auto acc = store->Access();
auto node = acc.CreateVertex();
for (size_t i = 0; i < row.size(); ++i) {
const auto &field = fields[i];
const auto &value = row[i];
if (utils::StartsWith(field.type, "ID")) {
if (id) throw LoadException("Only one node ID must be specified");
if (FLAGS_id_type == "INTEGER") {
// Call `StringToInt` to verify that the ID is a valid integer.
NodeId node_id{value, GetIdSpace(field.type)};
auto it = node_id_map->find(node_id);
if (it != node_id_map->end()) {
if (FLAGS_skip_duplicate_nodes) {
LOG(WARNING) << "Skipping duplicate node with ID '" << node_id << "'";
} else {
throw LoadException("Node with ID '{}' already exists", node_id);
node_id_map->emplace(node_id, node.Gid());
if (! {
storage::PropertyValue pv_id;
if (FLAGS_id_type == "INTEGER") {
pv_id = storage::PropertyValue(StringToInt(;
} else {
pv_id = storage::PropertyValue(;
auto node_property =
node.SetProperty(acc.NameToProperty(, pv_id);
if (!node_property.HasValue())
throw LoadException("Couldn't add property '{}' to the node",;
if (!*node_property)
throw LoadException("The property '{}' already exists",;
id = node_id;
} else if (field.type == "LABEL") {
for (const auto &label : utils::Split(value, FLAGS_array_delimiter)) {
auto node_label = node.AddLabel(acc.NameToLabel(label));
if (!node_label.HasValue())
throw LoadException("Couldn't add label '{}' to the node", label);
if (!*node_label)
throw LoadException("The label '{}' already exists", label);
} else if (field.type != "IGNORE") {
auto node_property = node.SetProperty(acc.NameToProperty(,
StringToValue(value, field.type));
if (!node_property.HasValue())
throw LoadException("Couldn't add property '{}' to the node",;
if (!*node_property)
throw LoadException("The property '{}' already exists",;
for (const auto &label : additional_labels) {
auto node_label = node.AddLabel(acc.NameToLabel(label));
if (!node_label.HasValue())
throw LoadException("Couldn't add label '{}' to the node", label);
if (!*node_label)
throw LoadException("The label '{}' already exists", label);
if (acc.Commit().HasError()) throw LoadException("Couldn't store the node");
void ProcessNodes(storage::Storage *store, const std::string &nodes_path,
std::optional<std::vector<Field>> *header,
std::unordered_map<NodeId, storage::Gid> *node_id_map,
const std::vector<std::string> &additional_labels) {
std::ifstream nodes_file(nodes_path);
CHECK(nodes_file) << "Unable to open '" << nodes_path << "'";
uint64_t row_number = 1;
try {
if (!*header) {
auto [fields, header_lines] = ReadHeader(nodes_file);
row_number += header_lines;
while (true) {
auto [row, lines_count] = ReadRow(nodes_file);
if (lines_count == 0) break;
if ((!FLAGS_ignore_extra_columns && row.size() != (*header)->size()) ||
(FLAGS_ignore_extra_columns && row.size() < (*header)->size()))
throw LoadException(
"Expected as many values as there are header fields (found {}, "
"expected {})",
row.size(), (*header)->size());
if (row.size() > (*header)->size()) {
ProcessNodeRow(store, **header, row, additional_labels, node_id_map);
row_number += lines_count;
} catch (const LoadException &e) {
LOG(FATAL) << "Couldn't process row " << row_number << " of '" << nodes_path
<< "' because of: " << e.what();
/// @throw LoadException
void ProcessRelationshipsRow(
storage::Storage *store, const std::vector<Field> &fields,
const std::vector<std::string> &row,
std::optional<std::string> relationship_type,
const std::unordered_map<NodeId, storage::Gid> &node_id_map) {
std::optional<storage::Gid> start_id;
std::optional<storage::Gid> end_id;
std::map<std::string, storage::PropertyValue> properties;
for (size_t i = 0; i < row.size(); ++i) {
const auto &field = fields[i];
const auto &value = row[i];
if (utils::StartsWith(field.type, "START_ID")) {
if (start_id) throw LoadException("Only one node ID must be specified");
if (FLAGS_id_type == "INTEGER") {
// Call `StringToInt` to verify that the START_ID is a valid integer.
NodeId node_id{value, GetIdSpace(field.type)};
auto it = node_id_map.find(node_id);
if (it == node_id_map.end()) {
if (FLAGS_skip_bad_relationships) {
LOG(WARNING) << "Skipping bad relationship with START_ID '" << node_id
<< "'";
} else {
throw LoadException("Node with ID '{}' does not exist", node_id);
start_id = it->second;
} else if (utils::StartsWith(field.type, "END_ID")) {
if (end_id) throw LoadException("Only one node ID must be specified");
if (FLAGS_id_type == "INTEGER") {
// Call `StringToInt` to verify that the END_ID is a valid integer.
NodeId node_id{value, GetIdSpace(field.type)};
auto it = node_id_map.find(node_id);
if (it == node_id_map.end()) {
if (FLAGS_skip_bad_relationships) {
LOG(WARNING) << "Skipping bad relationship with END_ID '" << node_id
<< "'";
} else {
throw LoadException("Node with ID '{}' does not exist", node_id);
end_id = it->second;
} else if (field.type == "TYPE") {
if (relationship_type)
throw LoadException("Only one relationship TYPE must be specified");
relationship_type = value;
} else if (field.type != "IGNORE") {
auto [it, inserted] =
properties.emplace(, StringToValue(value, field.type));
if (!inserted)
throw LoadException("The property '{}' already exists",;
if (!start_id) throw LoadException("START_ID must be set");
if (!end_id) throw LoadException("END_ID must be set");
if (!relationship_type) throw LoadException("Relationship TYPE must be set");
auto acc = store->Access();
auto from_node = acc.FindVertex(*start_id, storage::View::NEW);
if (!from_node) throw LoadException("From node must be in the storage");
auto to_node = acc.FindVertex(*end_id, storage::View::NEW);
if (!to_node) throw LoadException("To node must be in the storage");
auto relationship = acc.CreateEdge(&*from_node, &*to_node,
if (!relationship.HasValue())
throw LoadException("Couldn't create the relationship");
for (const auto &property : properties) {
auto ret = relationship->SetProperty(acc.NameToProperty(property.first),
if (!ret.HasValue()) {
if (ret.GetError() != storage::Error::PROPERTIES_DISABLED) {
throw LoadException("Couldn't add property '{}' to the relationship",
} else {
throw LoadException(
"Couldn't add property '{}' to the relationship because properties "
"on edges are disabled",
if (acc.Commit().HasError())
throw LoadException("Couldn't store the relationship");
void ProcessRelationships(
storage::Storage *store, const std::string &relationships_path,
const std::optional<std::string> &relationship_type,
std::optional<std::vector<Field>> *header,
const std::unordered_map<NodeId, storage::Gid> &node_id_map) {
std::ifstream relationships_file(relationships_path);
CHECK(relationships_file) << "Unable to open '" << relationships_path << "'";
uint64_t row_number = 1;
try {
if (!*header) {
auto [fields, header_lines] = ReadHeader(relationships_file);
row_number += header_lines;
while (true) {
auto [row, lines_count] = ReadRow(relationships_file);
if (lines_count == 0) break;
if ((!FLAGS_ignore_extra_columns && row.size() != (*header)->size()) ||
(FLAGS_ignore_extra_columns && row.size() < (*header)->size()))
throw LoadException(
"Expected as many values as there are header fields (found {}, "
"expected {})",
row.size(), (*header)->size());
if (row.size() > (*header)->size()) {
ProcessRelationshipsRow(store, **header, row, relationship_type,
row_number += lines_count;
} catch (const LoadException &e) {
LOG(FATAL) << "Couldn't process row " << row_number << " of '"
<< relationships_path << "' because of: " << e.what();
struct NodesArgument {
// List of all files that have should be processed for nodes.
std::vector<std::string> nodes;
// List of all additional labels that should be added to the nodes.
std::vector<std::string> additional_labels;
NodesArgument ParseNodesArgument(const std::string &value) {
// The format of this argument is as follows:
// [<label>[:<label>]...=]<file>[,<file>][,<file>]...
std::vector<std::string> nodes;
std::vector<std::string> additional_labels;
size_t pos_nodes = 0;
auto pos_equal = value.find('=');
if (pos_equal != std::string::npos) {
// We have additional labels.
additional_labels = utils::Split(value.substr(0, pos_equal), ":");
pos_nodes = pos_equal + 1;
nodes = utils::Split(value.substr(pos_nodes), ",");
return {std::move(nodes), std::move(additional_labels)};
struct RelationshipsArgument {
// List of all files that have should be processed for relationships.
std::vector<std::string> relationships;
// Optional type of the relationships.
std::optional<std::string> type;
RelationshipsArgument ParseRelationshipsArgument(const std::string &value) {
// The format of this argument is as follows:
// [<type>=]<file>[,<file>][,<file>]...
std::vector<std::string> relationships;
std::optional<std::string> type;
size_t pos_relationships = 0;
auto pos_equal = value.find('=');
if (pos_equal != std::string::npos) {
// The type has been specified.
type = value.substr(0, pos_equal);
pos_relationships = pos_equal + 1;
relationships = utils::Split(value.substr(pos_relationships), ",");
return {std::move(relationships), std::move(type)};
int main(int argc, char *argv[]) {
gflags::SetUsageMessage("Create a Memgraph recovery snapshot file from CSV.");
auto nodes = ParseRepeatedFlag("nodes", argc, argv);
auto relationships = ParseRepeatedFlag("relationships", argc, argv);
// Load config before parsing arguments, so that flags from the command line
// overwrite the config.
gflags::ParseCommandLineFlags(&argc, &argv, true);
CHECK(!nodes.empty()) << "The --nodes flag is required!";
std::string upper = utils::ToUpperCase(utils::Trim(FLAGS_id_type));
FLAGS_id_type = upper;
std::unordered_map<NodeId, storage::Gid> node_id_map;
storage::Storage store{
{.durability =
{.storage_directory = FLAGS_data_directory,
.recover_on_startup = false,
.snapshot_wal_mode =
.snapshot_on_exit = true},
.items = {
.properties_on_edges = FLAGS_storage_properties_on_edges,
utils::Timer load_timer;
// Process all nodes files.
for (const auto &value : nodes) {
auto [files, additional_labels] = ParseNodesArgument(value);
std::optional<std::vector<Field>> header;
for (const auto &nodes_file : files) {
LOG(INFO) << "Loading " << nodes_file;
ProcessNodes(&store, nodes_file, &header, &node_id_map,
// Process all relationships files.
for (const auto &value : relationships) {
auto [files, type] = ParseRelationshipsArgument(value);
std::optional<std::vector<Field>> header;
for (const auto &relationships_file : files) {
LOG(INFO) << "Loading " << relationships_file;
ProcessRelationships(&store, relationships_file, type, &header,
double load_sec = load_timer.Elapsed().count();
LOG(INFO) << "Loaded all data in " << fmt::format("{:.3f}", load_sec) << " s";
// The snapshot will be created in the storage destructor.
return 0;