Bipartite stress test.

Summary: Bipartite stress test. Common methods useful in the process of testing with python Bolt driver.

Reviewers: mislav.bradac, florijan

Reviewed By: florijan

Subscribers: florijan, pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D423
This commit is contained in:
Marko Budiselic 2017-06-06 11:00:59 +02:00
parent afbb940a05
commit 28173eaa3e
3 changed files with 391 additions and 49 deletions

217
tests/stress/bipartite.py Normal file
View File

@ -0,0 +1,217 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Large bipartite graph stress test.
'''
import logging
import multiprocessing
import time
from common import parse_connection_arguments, argument_session, \
assert_equal, batch_rendered_strings, \
OutputData, execute_till_success
def parse_args():
'''
Parses user arguments
:return: parsed arguments
'''
parser = parse_connection_arguments()
parser.add_argument('--no-workers', type=int,
default=multiprocessing.cpu_count(),
help='Number of concurrent workers.')
parser.add_argument('--no-u', type=int, default=100,
help='Size of U set in the bipartite graph.')
parser.add_argument('--no-v', type=int, default=100,
help='Size of V set in the bipartite graph.')
parser.add_argument('--vertex-batch-size', type=int, default=100,
help="Create vertices in batches of this size.")
parser.add_argument('--edge-batching', action='store_true',
help='Create edges in batches.')
parser.add_argument('--edge-batch-size', type=int, default=100,
help='Number of edges in a batch when edges '
'are created in batches.')
return parser.parse_args()
log = logging.getLogger(__name__)
args = parse_args()
output_data = OutputData()
def create_u_v_edges(u):
'''
Creates nodes and checks that all nodes were created.
create edges from one vertex in U set to all vertex of V set
:param worker_id: worker id
:return: tuple (worker_id, create execution time, time unit)
'''
start_time = time.time()
with argument_session(args) as session:
no_failures = 0
match_u_query = 'MATCH (u:U {id: %s}) ' % u
if args.edge_batching:
# TODO: try to randomize execution, the execution time should
# be smaller, add randomize flag
for batchm, dps in batch_rendered_strings(
'MATCH (v%s:V {id: %s})',
[(i, i) for i in range(args.no_v)],
args.edge_batch_size):
for batchc, _ in batch_rendered_strings(
'CREATE (u)-[:R]->(v%s)',
[dpi for dpi, _ in dps],
args.edge_batch_size):
no_failures += execute_till_success(
session, match_u_query + batchm + batchc)
else:
no_failures += execute_till_success(
session, match_u_query + 'MATCH (v:V) CREATE (u)-[:R]->(v)')
end_time = time.time()
return u, end_time - start_time, "s", no_failures
def traverse_from_u_worker(u):
'''
Traverses edges starting from an element of U set.
Traversed labels are: :U -> :V -> :U.
'''
with argument_session(args) as session:
start_time = time.time()
assert_equal(
args.no_u * args.no_v - args.no_v, # cypher morphism
session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) "
"RETURN count(v) AS cnt" % u).data()[0]['cnt'],
"Number of traversed edges started "
"from U(id:%s) is wrong!. " % u +
"Expected: %s Actual: %s")
end_time = time.time()
return u, end_time - start_time, 's'
def traverse_from_v_worker(v):
'''
Traverses edges starting from an element of V set.
Traversed labels are: :V -> :U -> :V.
'''
with argument_session(args) as session:
start_time = time.time()
assert_equal(
args.no_u * args.no_v - args.no_u, # cypher morphism
session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) "
"RETURN count(u) AS cnt" % v).data()[0]['cnt'],
"Number of traversed edges started "
"from V(id:%s) is wrong!. " % v +
"Expected: %s Actual: %s")
end_time = time.time()
return v, end_time - start_time, 's'
def execution_handler():
'''
Initializes client processes, database and starts the execution.
'''
# instance cleanup
with argument_session(args) as session:
start_time = time.time()
# clean existing database
session.run('MATCH (n) DETACH DELETE n').consume()
cleanup_end_time = time.time()
output_data.add_measurement("cleanup_time",
cleanup_end_time - start_time)
# create vertices
# create U vertices
for vertex_batch, _ in batch_rendered_strings('CREATE (:U {id: %s})',
range(args.no_u),
args.vertex_batch_size):
session.run(vertex_batch).consume()
# create V vertices
for vertex_batch, _ in batch_rendered_strings('CREATE (:V {id: %s})',
range(args.no_v),
args.vertex_batch_size):
session.run(vertex_batch).consume()
vertices_create_end_time = time.time()
output_data.add_measurement(
'vertices_create_time',
vertices_create_end_time - cleanup_end_time)
# concurrent create execution & tests
with multiprocessing.Pool(args.no_workers) as p:
create_edges_start_time = time.time()
for worker_id, create_time, time_unit, no_failures in \
p.map(create_u_v_edges, [i for i in range(args.no_u)]):
log.info('Worker ID: %s; Create time: %s%s Failures: %s' %
(worker_id, create_time, time_unit, no_failures))
create_edges_end_time = time.time()
output_data.add_measurement(
'edges_create_time',
create_edges_end_time - create_edges_start_time)
# check total number of edges
assert_equal(
args.no_v * args.no_u,
session.run(
'MATCH ()-[r]->() '
'RETURN count(r) AS cnt').data()[0]['cnt'],
"Total number of edges isn't correct! Expected: %s Actual: %s")
# check traversals starting from all elements of U
traverse_from_u_start_time = time.time()
for u, traverse_u_time, time_unit in \
p.map(traverse_from_u_worker,
[i for i in range(args.no_u)]):
log.info("U {id: %s} %s%s" % (u, traverse_u_time, time_unit))
traverse_from_u_end_time = time.time()
output_data.add_measurement(
'traverse_from_u_time',
traverse_from_u_end_time - traverse_from_u_start_time)
# check traversals starting from all elements of V
traverse_from_v_start_time = time.time()
for v, traverse_v_time, time_unit in \
p.map(traverse_from_v_worker,
[i for i in range(args.no_v)]):
log.info("V {id: %s} %s%s" % (v, traverse_v_time, time_unit))
traverse_from_v_end_time = time.time()
output_data.add_measurement(
'traverse_from_v_time',
traverse_from_v_end_time - traverse_from_v_start_time)
# check total number of vertices
assert_equal(
args.no_v + args.no_u,
session.run('MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'],
"Total number of vertices isn't correct! Expected: %s Actual: %s")
# check total number of edges
assert_equal(
args.no_v * args.no_u,
session.run(
'MATCH ()-[r]->() RETURN count(r) AS cnt').data()[0]['cnt'],
"Total number of edges isn't correct! Expected: %s Actual: %s")
end_time = time.time()
output_data.add_measurement("total_execution_time",
end_time - start_time)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
output_data.add_status("stress_test_name", "bipartite")
output_data.add_status("number_of_vertices", args.no_u + args.no_v)
output_data.add_status("number_of_edges", args.no_u * args.no_v)
output_data.add_status("vertex_batch_size", args.vertex_batch_size)
output_data.add_status("edge_batching", args.edge_batching)
output_data.add_status("edge_batch_size", args.edge_batch_size)
execution_handler()
output_data.console_dump()

161
tests/stress/common.py Normal file
View File

@ -0,0 +1,161 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Common methods for writing graph database
integration tests in python.
Only Bolt communication protocol is supported.
'''
import contextlib
from argparse import ArgumentParser
from neo4j.v1 import GraphDatabase, basic_auth
class OutputData:
'''
Encapsulates results and info about the tests.
'''
def __init__(self):
# data in time format (name, time, unit)
self._measurements = []
# name, string data
self._statuses = []
def add_measurement(self, name, time, unit="s"):
'''
Stores measurement.
:param name: str, name of measurement
:param time: float, time value
:param unit: str, time unit
'''
self._measurements.append((name, time, unit))
def add_status(self, name, status):
'''
Stores status data point.
:param name: str, name of data point
:param status: printable value
'''
self._statuses.append((name, status))
def console_dump(self):
'''
Dumps output data on the console output.
'''
print("Output data:")
for name, status in self._statuses:
print(" %s: %s" % (name, status))
for name, time, unit in self._measurements:
print(" %s: %s%s" % (name, time, unit))
def execute_till_success(session, query):
'''
Executes a query within Bolt session until the query is
successfully executed against the database.
:param session: active Bolt session
:param query: query to execute
:return: int, number of failuers
'''
no_failures = 0
try_again = True
while try_again:
try:
session.run(query).consume()
try_again = False
except Exception:
no_failures += 1
return no_failures
def batch_rendered_strings(t, dps, bs=1):
'''
Batches rendered strings based on template and data points. Template is
populated from a single data point and than more rendered strings
are batched into a single string.
:param t: str, template for the rendered string (for one data point)
:param dps, list or iterator with data points to populate the template
:param bs: int, batch size
:returns: (str, batched dps (might be useful for further rendering))
e.g. if t = "test %s", dps = range(1, 6), bs = 2
yields are going to be:
"test 1 test 2", [1, 2]
"test 3 test 4", [3, 4]
"test 5" [5]
'''
no_dps = len(dps)
for ndx in range(0, no_dps, bs):
yield (' '.join([t % dp for dp in dps[ndx:min(ndx + bs, no_dps)]]),
dps[ndx:min(ndx + bs, no_dps)])
def assert_equal(expected, actual, message):
'''
Compares expected and actual values. If values are not the same terminate
the execution.
:param expected: expected value
:param actual: actual value
:param message: str, message in case that the values are not equal, must
contain two placeholders (%s) to print the values.
'''
assert expected == actual, message % (expected, actual)
def parse_connection_arguments():
'''
Parses arguments related to establishing database connection like
host, port, username, etc.
:return: An instance of ArgumentParser
'''
parser = ArgumentParser(description=__doc__)
parser.add_argument('--endpoint', type=str, default='localhost:7687',
help='DBMS instance endpoint. '
'Bolt protocol is the only option.')
parser.add_argument('--username', type=str, default='neo4j',
help='DBMS instance username.')
parser.add_argument('--password', type=int, default='1234',
help='DBMS instance password.')
parser.add_argument('--ssl-enabled', action='store_false',
help="Is SSL enabled?")
parser.parse_known_args()
return parser
@contextlib.contextmanager
def bolt_session(url, auth, ssl=False):
'''
with wrapper around Bolt session.
:param url: str, e.g. "bolt://localhost:7687"
:param auth: auth method, goes directly to the Bolt driver constructor
:param ssl: bool, is ssl enabled
'''
driver = GraphDatabase.driver(url, auth=auth, encrypted=ssl)
session = driver.session()
try:
yield session
finally:
session.close()
driver.close()
def argument_session(args):
'''
:return: Bolt session based on program arguments
'''
return bolt_session('bolt://' + args.endpoint,
basic_auth(args.username, args.password))

View File

@ -7,15 +7,13 @@ Large scale stress test. Tests only node creation.
The idea is to run this test on machines with huge amount of memory e.g. 2TB.
'''
import contextlib
import logging
import multiprocessing
import random
import time
from argparse import ArgumentParser
from collections import defaultdict
from neo4j.v1 import GraphDatabase, basic_auth
from common import parse_connection_arguments, argument_session
def parse_args():
@ -24,16 +22,9 @@ def parse_args():
:return: parsed arguments
'''
parser = ArgumentParser(description=__doc__)
parser.add_argument('--endpoint', type=str, default='localhost:7687',
help='DBMS instance endpoint. '
'Bolt protocol is the only option.')
parser.add_argument('--username', type=str, default='neo4j',
help='DBMS instance username.')
parser.add_argument('--password', type=int, default='1234',
help='DBMS instance password.')
parser.add_argument('--ssl-enabled', action='store_false',
help="Is SSL enabled?")
parser = parse_connection_arguments()
# specific
parser.add_argument('--no-workers', type=int,
default=multiprocessing.cpu_count(),
help='Number of concurrent workers.')
@ -52,32 +43,6 @@ log = logging.getLogger(__name__)
args = parse_args()
@contextlib.contextmanager
def bolt_session(url, auth, ssl=False):
'''
with wrapper around Bolt session.
:param url: str, e.g. "bolt://localhost:7687"
:param auth: auth method, goes directly to the Bolt driver constructor
:param ssl: bool, is ssl enabled
'''
driver = GraphDatabase.driver(url, auth=auth, encrypted=ssl)
session = driver.session()
try:
yield session
finally:
session.close()
driver.close()
def argument_session():
'''
:return: Bolt session based on program arguments
'''
return bolt_session('bolt://' + args.endpoint,
basic_auth(args.username, args.password))
def create_worker(worker_id):
'''
Creates nodes and checks that all nodes were created.
@ -90,7 +55,7 @@ def create_worker(worker_id):
generated_xs = defaultdict(int)
create_query = ''
with argument_session() as session:
with argument_session(args) as session:
# create vertices
start_time = time.time()
for i in range(0, args.no_vertices):
@ -125,18 +90,17 @@ def create_handler():
Initializes processes and starts the execution.
'''
# instance cleanup
with argument_session() as session:
with argument_session(args) as session:
session.run("MATCH (n) DETACH DELETE n").consume()
# concurrent create execution & tests
with multiprocessing.Pool(args.no_workers) as p:
for worker_id, create_time, time_unit in \
p.map(create_worker, [i for i in range(args.no_workers)]):
log.info('Worker ID: %s; Create time: %s%s' %
(worker_id, create_time, time_unit))
# concurrent create execution & tests
with multiprocessing.Pool(args.no_workers) as p:
for worker_id, create_time, time_unit in \
p.map(create_worker, [i for i in range(args.no_workers)]):
log.info('Worker ID: %s; Create time: %s%s' %
(worker_id, create_time, time_unit))
# check total count
with argument_session() as session:
# check total count
expected_total_count = args.no_workers * args.no_vertices
total_count = session.run(
'MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt']