Upgraded neo2memgraph script

Reviewers: florijan, buda

Reviewed By: florijan, buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D542
This commit is contained in:
matej.gradicek 2017-07-14 16:52:01 +02:00
parent 4a500b99fa
commit 7b7aad196a

View File

@ -7,7 +7,11 @@ into a Memgraph database.
'''
import logging
import json
import os
from time import time
from datetime import datetime
from argparse import ArgumentParser
from neo4j.v1 import GraphDatabase, basic_auth
@ -15,18 +19,26 @@ log = logging.getLogger(__name__)
TEMP_ID = "__memgraph_temp_id_314235423"
TEMP_LABEL = "__memgraph_temp_label_1414213562"
def parse_args():
# TODO parse args properly from the cmdline
args = type('', (), {})()
args.neo_url = "127.0.0.1:7687"
args.neo_user = "neo4j"
args.neo_password = "1234"
args.neo_ssl = False
args.memgraph_url = "127.0.0.1:7688"
args.logging = "DEBUG"
return args
argp = ArgumentParser(description=__doc__)
argp.add_argument("--neo-url", default = "127.0.0.1:7687",
help = "Neo4j url, default 127.0.0.1:7687.")
argp.add_argument("--neo-user", default = "neo4j",
help = "Neo4j username, default neo4j.")
argp.add_argument("--neo-password", default = "1234",
help = "Neo4j password, default 1234.")
argp.add_argument("--neo-ssl", default = False, choices = [True, False],
help = "Encryption for neo4j auth data, default False")
argp.add_argument("--memgraph-url", default = "127.0.0.1:7688",
help = "Memgraph url, default 127.0.0.1:7688.")
argp.add_argument("--logging", default = "DEBUG", choices = ["INFO", "DEBUG"],
help = "Logging level, default debug.")
argp.add_argument("--json-storage", default = "json",
help = "Storage for JSON files.")
return argp.parse_args()
def create_vertex_cypher(vertex):
@ -34,66 +46,169 @@ def create_vertex_cypher(vertex):
Helper function that generates a cypher query for creting
a vertex based on the given Bolt vertex.
"""
labels = ""
if vertex.labels:
labels = ":" + ":".join(vertex.labels)
else:
labels = ""
labels += ":" + ":".join(vertex.labels)
vertex.properties[TEMP_ID] = vertex.id
properties = ", ".join('%s: %r' % kv for kv
in vertex.properties.items())
return "CREATE (%s {%s})" % (labels, properties)
def create_edge_cypher(edge):
def create_edge_cypher(edge, edge_num):
"""
Helper function that generates a cypher query for creting
a edge based on the given Bolt edge.
"""
properties = ", ".join('%s: %r' % kv for kv
in edge.properties.items())
return "MATCH (from {%s: %r}), (to {%s: %r}) " \
"CREATE (from)-[:%s {%s}]->(to)" % (
TEMP_ID, edge.start, TEMP_ID, edge.end, edge.type, properties)
return ["(from%s {%s: %r}), (to%s {%s: %r})" \
% (edge_num, TEMP_ID, edge.start, edge_num, TEMP_ID, edge.end),
"CREATE (from%s)-[:%s {%s}]->(to%s)" \
% (edge_num, edge.type, properties, edge_num)]
def vertex_to_dict(vertex):
"""
Returns dictionary which represents one vertex with labels and
properties.
:param vertex: Graph vertex
"""
return {"labels": list(vertex.labels), "properties": vertex.properties}
def edge_to_dict(edge):
"""
Returns dictionary which represents one edge with type, start vertex,
end vertex and properties.
:param edge: Graph edge
"""
return {"type": edge.type, "properties": edge.properties, \
"start": edge.start, "end": edge.end}
def create_json_file(storage, timestamp, element, batch_index, content):
"""
Creates json file with given content and path
storage/timestamp/element and file name is batch_count.json. Creates
directories where files are stored if directories don't exist.
:param storage: str, path where all json files are stored
:param timestamp: str, timestamp of the current transfer
:param element: str, expected vertex or edge, which elements are
stored in json
:param batch_index: int, index of the current batch, used in file name
:param content: list, contet which will be dumped in file
"""
json_file = os.path.join(storage, timestamp, element,
str(batch_index) + ".json")
os.makedirs(os.path.dirname(json_file), exist_ok = True)
print(content)
with open(json_file, 'w') as f:
json.dump(content, f, indent = 2)
def transfer(neo_driver, memgraph_driver):
def transfer(storage, neo_driver, memgraph_driver):
""" Copies all the data from Neo4j to Memgraph. """
# TODO add error handling
neo_session = neo_driver.session()
memgraph_session = memgraph_driver.session()
# TODO when available add index on TEMP_ID
# Creating index
log.debug("Creating memgraph index on TEMP_LABEL and TEMP_ID.")
memgraph_session.run("CREATE INDEX ON :%s(%s)" % (TEMP_LABEL, TEMP_ID))
neo_session.run("MATCH(n) SET n :%s, n.%s = ID(n)" % (TEMP_LABEL, TEMP_ID))
neo_session.run("CREATE INDEX ON :%s(%s)" % (TEMP_LABEL, TEMP_ID))
# transfer vertices
# TODO do it in batches
# load batches of let's say a few thousand, insert batches of let's
# say 100
read_vertex_batch = 2
write_vertex_batch = 3
read_edge_batch = 2
write_edge_batch = 3
vertex_count = 0
for vertex in neo_session.run("MATCH (n) RETURN n"):
vertex = vertex['n']
vertex_cypher = create_vertex_cypher(vertex)
log.debug("Vertex creaton cypher: %s", vertex_cypher)
memgraph_session.run(vertex_cypher).consume()
# TODO store all vertices into JSON, also in batches
# make destination and storage (if it should be stored) configurable
vertex_count += 1
# transfer edges
# TODO do it in batches, like with vertices
edge_count = 0
for edge in neo_session.run("MATCH ()-[r]->() return r"):
edge = edge['r']
edge_cypher = create_edge_cypher(edge)
log.debug("Edge creation cypher: %s", edge_cypher)
memgraph_session.run(edge_cypher).consume()
# TODO store all edges into JSON, configure like edges
edge_count += 1
# TODO when available remove index on TEMP_ID
cypher_query = ""
batch_count = 0
timestamp = datetime.fromtimestamp(time()).strftime("%Y_%m_%d__%H_%M_%S")
memgraph_session.run("MATCH (n) REMOVE n.%s" % TEMP_ID)
def write_vertices(vertices):
nonlocal batch_count
cypher_query = ""
vertices_list = []
for vertex in vertices:
cypher_query += create_vertex_cypher(vertex)
vertices_list.append(vertex_to_dict(vertex))
create_json_file(storage, timestamp, "vertices", batch_count, vertices_list)
log.debug("Vertex create on cypher: %s" % (cypher_query))
memgraph_session.run(cypher_query).consume()
batch_count += 1
vertices[:] = []
def write_edges(edges):
nonlocal batch_count
cypher_query = ""
edge_num = 0
edges_list = []
for edge in edges:
edges_list.append(edge_to_dict(edge))
edge_queries = create_edge_cypher(edge, edge_num)
edge_num += 1
if cypher_query:
cypher_query = edge_queries[0] + ", " + cypher_query + \
" " + edge_queries[1]
else:
cypher_query = ' '.join(edge_queries)
create_json_file(storage, timestamp, "edges", batch_count, edges_list)
cypher_query = "MATCH " + cypher_query
log.debug("Edge create on cypher: %s" % (cypher_query))
memgraph_session.run(cypher_query).consume()
batch_count += 1
edges[:] = []
# Vertex transfer
start_id = 0
vertices_batch = []
while True:
read_vertices_in_batch = 0
vertices = neo_session.run("MATCH(n) WHERE n.%s>=%s RETURN n " \
"ORDER BY ID(n) LIMIT %s" % (TEMP_ID, start_id, read_vertex_batch))
for vertex in vertices:
vertex = vertex['n']
vertices_batch.append(vertex)
if len(vertices_batch) >= write_vertex_batch:
write_vertices(vertices_batch)
start_id = vertex.id
read_vertices_in_batch += 1
vertex_count += 1
start_id += 1
if read_vertices_in_batch != read_vertex_batch:
break
if len(vertices_batch) > 0:
write_vertices(vertices_batch)
max_id = neo_session.run("MATCH(n) RETURN MAX(ID(n)) AS id").peek()['id']
start_id = 0
batch_count = 0
edges_batch = []
while start_id <= max_id:
edges = neo_session.run("MATCH (n)-[r]->() WHERE n.%s>=%s AND " \
"n.%s<%s RETURN r" % (TEMP_ID, start_id, TEMP_ID, start_id + read_edge_batch))
start_id = start_id + read_edge_batch
for edge in edges:
edge_count += 1
edge = edge['r']
edges_batch.append(edge)
if len(edges_batch) >= write_edge_batch:
write_edges(edges_batch)
if len(edges_batch) > 0:
write_edges(edges_batch)
# TODO Drop index in memgraph when it will be supported
log.debug("Removing TEMP_LABEL and TEMP_ID")
memgraph_session.run("MATCH (n) REMOVE n:%s, n.%s" % (TEMP_LABEL, TEMP_ID))
neo_session.run("MATCH (n) REMOVE n:%s, n.%s" % (TEMP_LABEL, TEMP_ID))
neo_session.run("DROP INDEX ON :%s(%s)" % (TEMP_LABEL, TEMP_ID))
log.info("Created %d vertiecs and %d edges", vertex_count, edge_count)
@ -115,7 +230,7 @@ def main():
encrypted=False)
start_time = time()
transfer(neo_driver, memgraph_driver)
transfer(args.json_storage, neo_driver, memgraph_driver)
log.info("Import complete in %.2f seconds", time() - start_time)
pass