diff --git a/tools/neo_to_memgraph.py b/tools/neo_to_memgraph.py index 5e1e4d590..f4fa2f937 100755 --- a/tools/neo_to_memgraph.py +++ b/tools/neo_to_memgraph.py @@ -7,7 +7,11 @@ into a Memgraph database. ''' import logging +import json +import os from time import time +from datetime import datetime +from argparse import ArgumentParser from neo4j.v1 import GraphDatabase, basic_auth @@ -15,18 +19,26 @@ log = logging.getLogger(__name__) TEMP_ID = "__memgraph_temp_id_314235423" +TEMP_LABEL = "__memgraph_temp_label_1414213562" def parse_args(): - # TODO parse args properly from the cmdline - args = type('', (), {})() - args.neo_url = "127.0.0.1:7687" - args.neo_user = "neo4j" - args.neo_password = "1234" - args.neo_ssl = False - args.memgraph_url = "127.0.0.1:7688" - args.logging = "DEBUG" - return args + argp = ArgumentParser(description=__doc__) + argp.add_argument("--neo-url", default = "127.0.0.1:7687", + help = "Neo4j url, default 127.0.0.1:7687.") + argp.add_argument("--neo-user", default = "neo4j", + help = "Neo4j username, default neo4j.") + argp.add_argument("--neo-password", default = "1234", + help = "Neo4j password, default 1234.") + argp.add_argument("--neo-ssl", default = False, choices = [True, False], + help = "Encryption for neo4j auth data, default False") + argp.add_argument("--memgraph-url", default = "127.0.0.1:7688", + help = "Memgraph url, default 127.0.0.1:7688.") + argp.add_argument("--logging", default = "DEBUG", choices = ["INFO", "DEBUG"], + help = "Logging level, default debug.") + argp.add_argument("--json-storage", default = "json", + help = "Storage for JSON files.") + return argp.parse_args() def create_vertex_cypher(vertex): @@ -34,66 +46,169 @@ def create_vertex_cypher(vertex): Helper function that generates a cypher query for creting a vertex based on the given Bolt vertex. """ + labels = "" if vertex.labels: - labels = ":" + ":".join(vertex.labels) - else: - labels = "" + labels += ":" + ":".join(vertex.labels) vertex.properties[TEMP_ID] = vertex.id properties = ", ".join('%s: %r' % kv for kv in vertex.properties.items()) return "CREATE (%s {%s})" % (labels, properties) -def create_edge_cypher(edge): +def create_edge_cypher(edge, edge_num): """ Helper function that generates a cypher query for creting a edge based on the given Bolt edge. """ properties = ", ".join('%s: %r' % kv for kv in edge.properties.items()) - return "MATCH (from {%s: %r}), (to {%s: %r}) " \ - "CREATE (from)-[:%s {%s}]->(to)" % ( - TEMP_ID, edge.start, TEMP_ID, edge.end, edge.type, properties) + return ["(from%s {%s: %r}), (to%s {%s: %r})" \ + % (edge_num, TEMP_ID, edge.start, edge_num, TEMP_ID, edge.end), + "CREATE (from%s)-[:%s {%s}]->(to%s)" \ + % (edge_num, edge.type, properties, edge_num)] + +def vertex_to_dict(vertex): + """ + Returns dictionary which represents one vertex with labels and + properties. + + :param vertex: Graph vertex + """ + return {"labels": list(vertex.labels), "properties": vertex.properties} + +def edge_to_dict(edge): + """ + Returns dictionary which represents one edge with type, start vertex, + end vertex and properties. + + :param edge: Graph edge + """ + return {"type": edge.type, "properties": edge.properties, \ + "start": edge.start, "end": edge.end} + +def create_json_file(storage, timestamp, element, batch_index, content): + """ + Creates json file with given content and path + storage/timestamp/element and file name is batch_count.json. Creates + directories where files are stored if directories don't exist. + + :param storage: str, path where all json files are stored + :param timestamp: str, timestamp of the current transfer + :param element: str, expected vertex or edge, which elements are + stored in json + :param batch_index: int, index of the current batch, used in file name + :param content: list, contet which will be dumped in file + """ + json_file = os.path.join(storage, timestamp, element, + str(batch_index) + ".json") + os.makedirs(os.path.dirname(json_file), exist_ok = True) + print(content) + with open(json_file, 'w') as f: + json.dump(content, f, indent = 2) -def transfer(neo_driver, memgraph_driver): +def transfer(storage, neo_driver, memgraph_driver): """ Copies all the data from Neo4j to Memgraph. """ # TODO add error handling neo_session = neo_driver.session() memgraph_session = memgraph_driver.session() - # TODO when available add index on TEMP_ID + # Creating index + log.debug("Creating memgraph index on TEMP_LABEL and TEMP_ID.") + memgraph_session.run("CREATE INDEX ON :%s(%s)" % (TEMP_LABEL, TEMP_ID)) + neo_session.run("MATCH(n) SET n :%s, n.%s = ID(n)" % (TEMP_LABEL, TEMP_ID)) + neo_session.run("CREATE INDEX ON :%s(%s)" % (TEMP_LABEL, TEMP_ID)) - # transfer vertices - # TODO do it in batches - # load batches of let's say a few thousand, insert batches of let's - # say 100 + read_vertex_batch = 2 + write_vertex_batch = 3 + read_edge_batch = 2 + write_edge_batch = 3 vertex_count = 0 - for vertex in neo_session.run("MATCH (n) RETURN n"): - vertex = vertex['n'] - vertex_cypher = create_vertex_cypher(vertex) - log.debug("Vertex creaton cypher: %s", vertex_cypher) - memgraph_session.run(vertex_cypher).consume() - # TODO store all vertices into JSON, also in batches - # make destination and storage (if it should be stored) configurable - vertex_count += 1 - - # transfer edges - # TODO do it in batches, like with vertices edge_count = 0 - for edge in neo_session.run("MATCH ()-[r]->() return r"): - edge = edge['r'] - edge_cypher = create_edge_cypher(edge) - log.debug("Edge creation cypher: %s", edge_cypher) - memgraph_session.run(edge_cypher).consume() - # TODO store all edges into JSON, configure like edges - edge_count += 1 - # TODO when available remove index on TEMP_ID + cypher_query = "" + batch_count = 0 + timestamp = datetime.fromtimestamp(time()).strftime("%Y_%m_%d__%H_%M_%S") - memgraph_session.run("MATCH (n) REMOVE n.%s" % TEMP_ID) + def write_vertices(vertices): + nonlocal batch_count + cypher_query = "" + vertices_list = [] + for vertex in vertices: + cypher_query += create_vertex_cypher(vertex) + vertices_list.append(vertex_to_dict(vertex)) + create_json_file(storage, timestamp, "vertices", batch_count, vertices_list) + log.debug("Vertex create on cypher: %s" % (cypher_query)) + memgraph_session.run(cypher_query).consume() + batch_count += 1 + vertices[:] = [] + def write_edges(edges): + nonlocal batch_count + cypher_query = "" + edge_num = 0 + edges_list = [] + for edge in edges: + edges_list.append(edge_to_dict(edge)) + edge_queries = create_edge_cypher(edge, edge_num) + edge_num += 1 + if cypher_query: + cypher_query = edge_queries[0] + ", " + cypher_query + \ + " " + edge_queries[1] + else: + cypher_query = ' '.join(edge_queries) + create_json_file(storage, timestamp, "edges", batch_count, edges_list) + cypher_query = "MATCH " + cypher_query + log.debug("Edge create on cypher: %s" % (cypher_query)) + memgraph_session.run(cypher_query).consume() + batch_count += 1 + edges[:] = [] + + # Vertex transfer + start_id = 0 + vertices_batch = [] + while True: + read_vertices_in_batch = 0 + vertices = neo_session.run("MATCH(n) WHERE n.%s>=%s RETURN n " \ + "ORDER BY ID(n) LIMIT %s" % (TEMP_ID, start_id, read_vertex_batch)) + for vertex in vertices: + vertex = vertex['n'] + vertices_batch.append(vertex) + if len(vertices_batch) >= write_vertex_batch: + write_vertices(vertices_batch) + start_id = vertex.id + read_vertices_in_batch += 1 + vertex_count += 1 + start_id += 1 + if read_vertices_in_batch != read_vertex_batch: + break + if len(vertices_batch) > 0: + write_vertices(vertices_batch) + + max_id = neo_session.run("MATCH(n) RETURN MAX(ID(n)) AS id").peek()['id'] + + start_id = 0 + batch_count = 0 + edges_batch = [] + while start_id <= max_id: + edges = neo_session.run("MATCH (n)-[r]->() WHERE n.%s>=%s AND " \ + "n.%s<%s RETURN r" % (TEMP_ID, start_id, TEMP_ID, start_id + read_edge_batch)) + start_id = start_id + read_edge_batch + for edge in edges: + edge_count += 1 + edge = edge['r'] + edges_batch.append(edge) + if len(edges_batch) >= write_edge_batch: + write_edges(edges_batch) + if len(edges_batch) > 0: + write_edges(edges_batch) + + # TODO Drop index in memgraph when it will be supported + log.debug("Removing TEMP_LABEL and TEMP_ID") + memgraph_session.run("MATCH (n) REMOVE n:%s, n.%s" % (TEMP_LABEL, TEMP_ID)) + neo_session.run("MATCH (n) REMOVE n:%s, n.%s" % (TEMP_LABEL, TEMP_ID)) + neo_session.run("DROP INDEX ON :%s(%s)" % (TEMP_LABEL, TEMP_ID)) log.info("Created %d vertiecs and %d edges", vertex_count, edge_count) @@ -115,7 +230,7 @@ def main(): encrypted=False) start_time = time() - transfer(neo_driver, memgraph_driver) + transfer(args.json_storage, neo_driver, memgraph_driver) log.info("Import complete in %.2f seconds", time() - start_time) pass