From e4dcfef91ab616f9dbb55d5aab115c67b6f6bcd7 Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Wed, 16 Aug 2017 15:32:48 +0200 Subject: [PATCH] Add CSV to Memgraph snapshot file converter Reviewers: mferencevic, buda Reviewed By: mferencevic, buda Differential Revision: https://phabricator.memgraph.io/D673 --- tools/csv_to_snapshot | 344 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 344 insertions(+) create mode 100755 tools/csv_to_snapshot diff --git a/tools/csv_to_snapshot b/tools/csv_to_snapshot new file mode 100755 index 000000000..5a84abafa --- /dev/null +++ b/tools/csv_to_snapshot @@ -0,0 +1,344 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +Create a Memgraph recovery snapshot file from CSV. +''' + +import argparse +import csv +import itertools as it +import logging +import struct + +log = logging.getLogger(__name__) + +_CSV_TYPE_TO_PY_TYPE = { + 'int': int, + 'long': int, + 'float': float, + 'double': float, + 'boolean': bool, + 'byte': int, + 'short': int, + 'char': str, + 'string': str, + } + + +def csv_to_py_val(value, csv_type, array_delimiter): + if not csv_type.endswith('[]'): + return _CSV_TYPE_TO_PY_TYPE[csv_type](value) + # Otherwise we have an array type, so convert it to a list. + csv_type = csv_type[:-2] + py_type = _CSV_TYPE_TO_PY_TYPE[csv_type] + return [py_type(val.strip()) for val in value.split(array_delimiter)] + + +class NodeId: + def __init__(self, id_, id_space): + if not id_: + raise ValueError('ID must not be empty') + self.id = id_ + self.id_space = id_space + + def __eq__(self, other): + if not isinstance(other, NodeId): + return NotImplemented + return self.id == other.id and self.id_space == other.id_space + + def __hash__(self): + return hash((self.id, self.id_space)) + + def __str__(self): + if self.id_space is None: + return self.id + return '{}({})'.format(self.id, self.id_space) + + +class Hasher: + '''Implementation of memgraph/src/durability/hasher. + The API mimics hashlib, so that it will be easier to switch to something + more sane (e.g. sha256).''' + _PRIME = 3137 + + def __init__(self): + self._hash = 0 + + def update(self, data): + if not isinstance(data, bytes): + raise TypeError("Expected 'bytes', but got '{}'" + .format(type(data).__name__)) + for byte in data: + self._hash = self._hash * self._PRIME + byte + 1 + + def digest(self): + '''Return the digest value as an int (which fits in uint64_t) and + *not* as bytes. (This is different from hashlib objects.)''' + return self._hash % (2**64) + + +class BoltEncoder: + # Type markers + _NULL_MARKER = b'\xC0' + _FLOAT64_MARKER = b'\xC1' + _FALSE_MARKER = b'\xC2' + _TRUE_MARKER = b'\xC3' + _INT64_MARKER = b'\xCB' + _STRING32_MARKER = b'\xD2' + _LIST32_MARKER = b'\xD6' + _MAP32_MARKER = b'\xDA' + _NODE_MARKER = b'\xB3\x4E' + _RELATIONSHIP_MARKER = b'\xB5\x52' + # Struct formats + _INT64_STRUCT = struct.Struct('>q') + _UINT32_STRUCT = struct.Struct('>I') + _UINT64_STRUCT = struct.Struct('>Q') + _FLOAT64_STRUCT = struct.Struct('>d') + + def __init__(self, file, hasher, skip_duplicate_nodes): + self._file = file + self._hasher = hasher + self._relationship_id = 0 + self._node_id = 0 + self._csv_to_mg_node_id = {} + self._skip_duplicate_nodes = skip_duplicate_nodes + + def write(self, value): + if value is None: + return self.write_null() + write = getattr(self, 'write_' + type(value).__name__) + write(value) + + def write_null(self): + self._write(self._NULL_MARKER) + + def write_bool(self, value): + if value: + self._write(self._TRUE_MARKER) + else: + self._write(self._FALSE_MARKER) + + def write_int(self, value): + self._write(self._INT64_MARKER) + self._write(self._INT64_STRUCT.pack(value)) + + def write_float(self, value): + self._write(self._FLOAT64_MARKER) + self._write(self._FLOAT64_STRUCT.pack(value)) + + def write_str(self, value): + self._write(self._STRING32_MARKER) + self._write(self._UINT32_STRUCT.pack(len(value))) + self._write(value.encode('utf-8')) + + def write_list(self, values): + self._write(self._LIST32_MARKER) + self._write(self._UINT32_STRUCT.pack(len(values))) + for value in values: + self.write(value) + + def write_dict(self, dict_value): + self._write(self._MAP32_MARKER) + self._write(self._UINT32_STRUCT.pack(len(dict_value))) + for key, value in dict_value.items(): + self.write_str(key) + self.write(value) + + def write_summary(self, node_count, relationship_count): + # It's a bit silly that the summary isn't considered for hashing + # (see: memgraph/src/durability/file_writer_buffer) + self._write(self._UINT64_STRUCT.pack(node_count), update_hash=False) + self._write(self._UINT64_STRUCT.pack(relationship_count), + update_hash=False) + self._write(self._UINT64_STRUCT.pack(self._hasher.digest()), + update_hash=False) + + def write_node(self, node_id, labels, properties): + id_ = None + try: + id_ = self._add_node_id(node_id) + except ValueError: + if self._skip_duplicate_nodes: + return + else: + raise + self._write(self._NODE_MARKER) + self.write_int(id_) + self.write_list(labels) + self.write_dict(properties) + + def write_relationship(self, start_id, end_id, type_, properties): + self._write(self._RELATIONSHIP_MARKER) + self.write_int(self._relationship_id) + self.write_int(self._csv_to_mg_node_id[start_id]) + self.write_int(self._csv_to_mg_node_id[end_id]) + self._relationship_id += 1 + self.write_str(type_) + self.write_dict(properties) + + def _write(self, byte_data, update_hash=True): + log.debug("Writing bytes '0x{}'".format(byte_data.hex())) + if update_hash: + self._hasher.update(byte_data) + self._file.write(byte_data) + + def _add_node_id(self, node_id): + '''Add a new mapping from CSV node ID to Memgraph ID and + return the Memgraph ID.''' + if node_id in self._csv_to_mg_node_id: + raise ValueError("Node '{}' already exists".format(node_id)) + id_ = self._node_id + self._csv_to_mg_node_id[node_id] = id_ + self._node_id += 1 + return id_ + + +def parse_args(): + argp = argparse.ArgumentParser(description=__doc__) + argp.add_argument('-o', '--out', required=True, + help='Destination for the created snapshot file') + argp.add_argument('-n', '--nodes', action='append', required=True, + help='CSV file containing graph nodes (vertices)') + argp.add_argument('-r', '--relationships', default=[], action='append', + help='CSV file containing graph relationships (edges)') + argp.add_argument('--overwrite', action='store_true', default=False, + help='Overwrite the output file if it exists') + argp.add_argument('--log_level', default='WARNING', + choices=['INFO', 'WARNING', 'DEBUG'], + help='Log level, default is WARNING') + argp.add_argument('--array-delimiter', default=';', + help='Delimiter between elements of array values, ' + "default is ';'") + argp.add_argument('--csv-delimiter', default=',', + help='Delimiter between each field in the CSV, ' + "default is ','") + argp.add_argument('--skip-duplicate-nodes', action='store_true', default=False, + help='Skip duplicate nodes or raise an error (default)') + return argp.parse_args() + + +def get_field_name_and_type(field): + '''Return (field_name, field_type) from the field string. + If there is no type, field_type is returned as None.''' + field_name_and_type = field.split(':', maxsplit=1) + name = field_name_and_type[0] + if len(field_name_and_type) == 1: + return name, None + field_type = field_name_and_type[1].strip().lower() + return name, field_type + + +def get_id_space(field_type): + group_start = field_type.find('(') + if group_start == -1: + return None + return field_type[1 + field_type.find('('):-1] + + +def write_node_row(node_row, array_delimiter, encoder): + node_id = None + node_labels = [] + properties = {} + for field, value in node_row.items(): + value = value.strip() + name, field_type = get_field_name_and_type(field) + if field_type is not None and field_type.startswith('id'): + if node_id is not None: + raise ValueError('Only one node ID must be specified') + node_id = NodeId(value, get_id_space(field_type)) + elif field_type == 'label': + labels = map(str.strip, value.split(array_delimiter)) + node_labels.extend(label for label in labels if label) + elif field_type != 'ignore': + # Everything else is a property. + # Missing field_type defaults to string. + if not field_type: + field_type = 'string' + properties[name] = csv_to_py_val(value, field_type, array_delimiter) + if node_id is None: + raise ValueError('Node ID must be specified') + encoder.write_node(node_id, node_labels, properties) + + +def convert_nodes(node_filenames, csv_delimiter, array_delimiter, encoder): + node_count = 0 + for node_filename in node_filenames: + with open(node_filename) as node_file: + nodes = csv.DictReader(node_file, delimiter=csv_delimiter) + for node in nodes: + write_node_row(node, array_delimiter, encoder) + node_count += 1 + return node_count + + +def write_relationship_row(relationship_row, array_delimiter, encoder): + start_id = None + end_id = None + relationship_type = None + properties = {} + for field, value in relationship_row.items(): + value = value.strip() + name, field_type = get_field_name_and_type(field) + if field_type is not None and field_type.startswith('start_id'): + if start_id is not None: + raise ValueError('Only one node ID must be specified') + start_id = NodeId(value, get_id_space(field_type)) + elif field_type is not None and field_type.startswith('end_id'): + if end_id is not None: + raise ValueError('Only one node ID must be specified') + end_id = NodeId(value, get_id_space(field_type)) + elif field_type == 'type': + if relationship_type is not None: + raise ValueError('Only one relationship TYPE must be specified') + relationship_type = value + elif field_type != 'ignore': + # Everything else is a property. + # Missing field_type defaults to string. + if not field_type: + field_type = 'string' + properties[name] = csv_to_py_val(value, field_type, array_delimiter) + if None in (start_id, end_id, relationship_type): + raise ValueError('Relationship TYPE, START_ID and END_ID must be set') + encoder.write_relationship(start_id, end_id, relationship_type, properties) + + +def convert_relationships(relationship_filenames, csv_delimiter, + array_delimiter, encoder): + relationship_count = 0 + for relationship_filename in relationship_filenames: + with open(relationship_filename) as relationship_file: + relationships = csv.DictReader(relationship_file, + delimiter=csv_delimiter) + for relationship in relationships: + write_relationship_row(relationship, array_delimiter, encoder) + relationship_count += 1 + return relationship_count + + +def main(): + args = parse_args() + logging.basicConfig(level=args.log_level) + all_input_names = ', '.join(it.chain(args.nodes, args.relationships)) + log.info("Converting {} to '{}'".format(all_input_names, args.out)) + with open(args.out, 'wb' if args.overwrite else 'xb') as dest_file: + hasher = Hasher() + encoder = BoltEncoder(dest_file, hasher, args.skip_duplicate_nodes) + # Snapshot file has the following contents in order: + # 1) list of label+property index + # 2) all nodes, sequantially, but not encoded as a list + # 3) all relationships, sequantially, but not encoded as a list + # 3) summary with node count, relationship count and hash digest + encoder.write_list([]) # Label + property indexes. + node_count = convert_nodes(args.nodes, args.csv_delimiter, + args.array_delimiter, encoder) + relationship_count = convert_relationships(args.relationships, + args.csv_delimiter, + args.array_delimiter, + encoder) + encoder.write_summary(node_count, relationship_count) + log.info("Created '{}'".format(args.out)) + + +if __name__ == '__main__': + main()