Reuse sessions in bipartite
Reviewers: florijan, mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1028
This commit is contained in:
parent
60f4db2b9f
commit
73c01cfb73
@ -8,8 +8,10 @@ Large bipartite graph stress test.
|
||||
import logging
|
||||
import multiprocessing
|
||||
import time
|
||||
import atexit
|
||||
import os
|
||||
|
||||
from common import connection_argument_parser, argument_session, assert_equal,\
|
||||
from common import connection_argument_parser, assert_equal, argument_driver, \
|
||||
OutputData, execute_till_success, batch, render
|
||||
|
||||
|
||||
@ -45,6 +47,31 @@ args = parse_args()
|
||||
output_data = OutputData()
|
||||
|
||||
|
||||
# This class is used to create and cache sessions. Session is cached by args
|
||||
# used to create it and process' pid in which it was created. This makes it easy
|
||||
# to reuse session with python multiprocessing primitives like pmap.
|
||||
class SessionCache:
|
||||
cache = {}
|
||||
|
||||
@staticmethod
|
||||
def argument_session(args):
|
||||
key = tuple(vars(args).items()) + (os.getpid(),)
|
||||
if key in SessionCache.cache:
|
||||
return SessionCache.cache[key][1]
|
||||
driver = argument_driver(args)
|
||||
session = driver.session()
|
||||
SessionCache.cache[key] = (driver, session)
|
||||
return session
|
||||
|
||||
@staticmethod
|
||||
def cleanup():
|
||||
for _, (driver, session) in SessionCache.cache.items():
|
||||
session.close()
|
||||
driver.close()
|
||||
|
||||
atexit.register(SessionCache.cleanup)
|
||||
|
||||
|
||||
def create_u_v_edges(u):
|
||||
'''
|
||||
Creates nodes and checks that all nodes were created.
|
||||
@ -55,20 +82,20 @@ def create_u_v_edges(u):
|
||||
:return: tuple (worker_id, create execution time, time unit)
|
||||
'''
|
||||
start_time = time.time()
|
||||
with argument_session(args) as session:
|
||||
no_failures = 0
|
||||
match_u = 'MATCH (u:U {id: %d})' % u
|
||||
if args.edge_batching:
|
||||
# TODO: try to randomize execution, the execution time should
|
||||
# be smaller, add randomize flag
|
||||
for v_id_batch in batch(range(args.v_count), args.edge_batch_size):
|
||||
match_v = render(" MATCH (v{0}:V {{id: {0}}})", v_id_batch)
|
||||
create_u = render(" CREATE (u)-[:R]->(v{0})", v_id_batch)
|
||||
query = match_u + "".join(match_v) + "".join(create_u)
|
||||
no_failures += execute_till_success(session, query)[1]
|
||||
else:
|
||||
no_failures += execute_till_success(
|
||||
session, match_u + ' MATCH (v:V) CREATE (u)-[:R]->(v)')[1]
|
||||
session = SessionCache.argument_session(args)
|
||||
no_failures = 0
|
||||
match_u = 'MATCH (u:U {id: %d})' % u
|
||||
if args.edge_batching:
|
||||
# TODO: try to randomize execution, the execution time should
|
||||
# be smaller, add randomize flag
|
||||
for v_id_batch in batch(range(args.v_count), args.edge_batch_size):
|
||||
match_v = render(" MATCH (v{0}:V {{id: {0}}})", v_id_batch)
|
||||
create_u = render(" CREATE (u)-[:R]->(v{0})", v_id_batch)
|
||||
query = match_u + "".join(match_v) + "".join(create_u)
|
||||
no_failures += execute_till_success(session, query)[1]
|
||||
else:
|
||||
no_failures += execute_till_success(
|
||||
session, match_u + ' MATCH (v:V) CREATE (u)-[:R]->(v)')[1]
|
||||
|
||||
end_time = time.time()
|
||||
return u, end_time - start_time, "s", no_failures
|
||||
@ -79,16 +106,16 @@ def traverse_from_u_worker(u):
|
||||
Traverses edges starting from an element of U set.
|
||||
Traversed labels are: :U -> :V -> :U.
|
||||
'''
|
||||
with argument_session(args) as session:
|
||||
start_time = time.time()
|
||||
assert_equal(
|
||||
args.u_count * args.v_count - args.v_count, # cypher morphism
|
||||
session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) "
|
||||
"RETURN count(v) AS cnt" % u).data()[0]['cnt'],
|
||||
"Number of traversed edges started "
|
||||
"from U(id:%s) is wrong!. " % u +
|
||||
"Expected: %s Actual: %s")
|
||||
end_time = time.time()
|
||||
session = SessionCache.argument_session(args)
|
||||
start_time = time.time()
|
||||
assert_equal(
|
||||
args.u_count * args.v_count - args.v_count, # cypher morphism
|
||||
session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) "
|
||||
"RETURN count(v) AS cnt" % u).data()[0]['cnt'],
|
||||
"Number of traversed edges started "
|
||||
"from U(id:%s) is wrong!. " % u +
|
||||
"Expected: %s Actual: %s")
|
||||
end_time = time.time()
|
||||
return u, end_time - start_time, 's'
|
||||
|
||||
|
||||
@ -97,16 +124,16 @@ def traverse_from_v_worker(v):
|
||||
Traverses edges starting from an element of V set.
|
||||
Traversed labels are: :V -> :U -> :V.
|
||||
'''
|
||||
with argument_session(args) as session:
|
||||
start_time = time.time()
|
||||
assert_equal(
|
||||
args.u_count * args.v_count - args.u_count, # cypher morphism
|
||||
session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) "
|
||||
"RETURN count(u) AS cnt" % v).data()[0]['cnt'],
|
||||
"Number of traversed edges started "
|
||||
"from V(id:%s) is wrong!. " % v +
|
||||
"Expected: %s Actual: %s")
|
||||
end_time = time.time()
|
||||
session = SessionCache.argument_session(args)
|
||||
start_time = time.time()
|
||||
assert_equal(
|
||||
args.u_count * args.v_count - args.u_count, # cypher morphism
|
||||
session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) "
|
||||
"RETURN count(u) AS cnt" % v).data()[0]['cnt'],
|
||||
"Number of traversed edges started "
|
||||
"from V(id:%s) is wrong!. " % v +
|
||||
"Expected: %s Actual: %s")
|
||||
end_time = time.time()
|
||||
return v, end_time - start_time, 's'
|
||||
|
||||
|
||||
@ -115,86 +142,86 @@ def execution_handler():
|
||||
Initializes client processes, database and starts the execution.
|
||||
'''
|
||||
# instance cleanup
|
||||
with argument_session(args) as session:
|
||||
start_time = time.time()
|
||||
session = SessionCache.argument_session(args)
|
||||
start_time = time.time()
|
||||
|
||||
# clean existing database
|
||||
session.run('MATCH (n) DETACH DELETE n').consume()
|
||||
cleanup_end_time = time.time()
|
||||
output_data.add_measurement("cleanup_time",
|
||||
cleanup_end_time - start_time)
|
||||
# clean existing database
|
||||
session.run('MATCH (n) DETACH DELETE n').consume()
|
||||
cleanup_end_time = time.time()
|
||||
output_data.add_measurement("cleanup_time",
|
||||
cleanup_end_time - start_time)
|
||||
|
||||
# create U vertices
|
||||
for b in batch(render('CREATE (:U {{id: {}}})', range(args.u_count)),
|
||||
args.vertex_batch_size):
|
||||
session.run(" ".join(b)).consume()
|
||||
# create V vertices
|
||||
for b in batch(render('CREATE (:V {{id: {}}})', range(args.v_count)),
|
||||
args.vertex_batch_size):
|
||||
session.run(" ".join(b)).consume()
|
||||
vertices_create_end_time = time.time()
|
||||
# create U vertices
|
||||
for b in batch(render('CREATE (:U {{id: {}}})', range(args.u_count)),
|
||||
args.vertex_batch_size):
|
||||
session.run(" ".join(b)).consume()
|
||||
# create V vertices
|
||||
for b in batch(render('CREATE (:V {{id: {}}})', range(args.v_count)),
|
||||
args.vertex_batch_size):
|
||||
session.run(" ".join(b)).consume()
|
||||
vertices_create_end_time = time.time()
|
||||
output_data.add_measurement(
|
||||
'vertices_create_time',
|
||||
vertices_create_end_time - cleanup_end_time)
|
||||
|
||||
# concurrent create execution & tests
|
||||
with multiprocessing.Pool(args.worker_count) as p:
|
||||
create_edges_start_time = time.time()
|
||||
for worker_id, create_time, time_unit, no_failures in \
|
||||
p.map(create_u_v_edges, [i for i in range(args.u_count)]):
|
||||
log.info('Worker ID: %s; Create time: %s%s Failures: %s' %
|
||||
(worker_id, create_time, time_unit, no_failures))
|
||||
create_edges_end_time = time.time()
|
||||
output_data.add_measurement(
|
||||
'vertices_create_time',
|
||||
vertices_create_end_time - cleanup_end_time)
|
||||
|
||||
# concurrent create execution & tests
|
||||
with multiprocessing.Pool(args.worker_count) as p:
|
||||
create_edges_start_time = time.time()
|
||||
for worker_id, create_time, time_unit, no_failures in \
|
||||
p.map(create_u_v_edges, [i for i in range(args.u_count)]):
|
||||
log.info('Worker ID: %s; Create time: %s%s Failures: %s' %
|
||||
(worker_id, create_time, time_unit, no_failures))
|
||||
create_edges_end_time = time.time()
|
||||
output_data.add_measurement(
|
||||
'edges_create_time',
|
||||
create_edges_end_time - create_edges_start_time)
|
||||
|
||||
# check total number of edges
|
||||
assert_equal(
|
||||
args.v_count * args.u_count,
|
||||
session.run(
|
||||
'MATCH ()-[r]->() '
|
||||
'RETURN count(r) AS cnt').data()[0]['cnt'],
|
||||
"Total number of edges isn't correct! Expected: %s Actual: %s")
|
||||
|
||||
# check traversals starting from all elements of U
|
||||
traverse_from_u_start_time = time.time()
|
||||
for u, traverse_u_time, time_unit in \
|
||||
p.map(traverse_from_u_worker,
|
||||
[i for i in range(args.u_count)]):
|
||||
log.info("U {id: %s} %s%s" % (u, traverse_u_time, time_unit))
|
||||
traverse_from_u_end_time = time.time()
|
||||
output_data.add_measurement(
|
||||
'traverse_from_u_time',
|
||||
traverse_from_u_end_time - traverse_from_u_start_time)
|
||||
|
||||
# check traversals starting from all elements of V
|
||||
traverse_from_v_start_time = time.time()
|
||||
for v, traverse_v_time, time_unit in \
|
||||
p.map(traverse_from_v_worker,
|
||||
[i for i in range(args.v_count)]):
|
||||
log.info("V {id: %s} %s%s" % (v, traverse_v_time, time_unit))
|
||||
traverse_from_v_end_time = time.time()
|
||||
output_data.add_measurement(
|
||||
'traverse_from_v_time',
|
||||
traverse_from_v_end_time - traverse_from_v_start_time)
|
||||
|
||||
# check total number of vertices
|
||||
assert_equal(
|
||||
args.v_count + args.u_count,
|
||||
session.run('MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'],
|
||||
"Total number of vertices isn't correct! Expected: %s Actual: %s")
|
||||
'edges_create_time',
|
||||
create_edges_end_time - create_edges_start_time)
|
||||
|
||||
# check total number of edges
|
||||
assert_equal(
|
||||
args.v_count * args.u_count,
|
||||
session.run(
|
||||
'MATCH ()-[r]->() RETURN count(r) AS cnt').data()[0]['cnt'],
|
||||
'MATCH ()-[r]->() '
|
||||
'RETURN count(r) AS cnt').data()[0]['cnt'],
|
||||
"Total number of edges isn't correct! Expected: %s Actual: %s")
|
||||
|
||||
end_time = time.time()
|
||||
output_data.add_measurement("total_execution_time",
|
||||
end_time - start_time)
|
||||
# check traversals starting from all elements of U
|
||||
traverse_from_u_start_time = time.time()
|
||||
for u, traverse_u_time, time_unit in \
|
||||
p.map(traverse_from_u_worker,
|
||||
[i for i in range(args.u_count)]):
|
||||
log.info("U {id: %s} %s%s" % (u, traverse_u_time, time_unit))
|
||||
traverse_from_u_end_time = time.time()
|
||||
output_data.add_measurement(
|
||||
'traverse_from_u_time',
|
||||
traverse_from_u_end_time - traverse_from_u_start_time)
|
||||
|
||||
# check traversals starting from all elements of V
|
||||
traverse_from_v_start_time = time.time()
|
||||
for v, traverse_v_time, time_unit in \
|
||||
p.map(traverse_from_v_worker,
|
||||
[i for i in range(args.v_count)]):
|
||||
log.info("V {id: %s} %s%s" % (v, traverse_v_time, time_unit))
|
||||
traverse_from_v_end_time = time.time()
|
||||
output_data.add_measurement(
|
||||
'traverse_from_v_time',
|
||||
traverse_from_v_end_time - traverse_from_v_start_time)
|
||||
|
||||
# check total number of vertices
|
||||
assert_equal(
|
||||
args.v_count + args.u_count,
|
||||
session.run('MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'],
|
||||
"Total number of vertices isn't correct! Expected: %s Actual: %s")
|
||||
|
||||
# check total number of edges
|
||||
assert_equal(
|
||||
args.v_count * args.u_count,
|
||||
session.run(
|
||||
'MATCH ()-[r]->() RETURN count(r) AS cnt').data()[0]['cnt'],
|
||||
"Total number of edges isn't correct! Expected: %s Actual: %s")
|
||||
|
||||
end_time = time.time()
|
||||
output_data.add_measurement("total_execution_time",
|
||||
end_time - start_time)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -175,6 +175,8 @@ def bolt_session(url, auth, ssl=False):
|
||||
driver.close()
|
||||
|
||||
|
||||
# If you are using session with multiprocessing take a look at SesssionCache
|
||||
# in bipartite for an idea how to reuse sessions.
|
||||
def argument_session(args):
|
||||
'''
|
||||
:return: Bolt session context manager based on program arguments
|
||||
|
Loading…
Reference in New Issue
Block a user