Tests - long-running test added

Summary:
Tests - long-running test added

 - common functionality refactor
 - long running tests WIP

tests/stress/common: minor refactor, long_running WIP

tests/stress/long_running - stable single-threaded, unstable concurrent

tests/stress/long_running - added error summary output

test/stress/long_running - add/remove vertex/edge stable multithreaded

test/stress/long_running - added vertex label testing

tests/stress/long_running - time limit added

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D489
This commit is contained in:
florijan 2017-06-14 13:42:06 +02:00
parent 01d9688273
commit a726ac0023
4 changed files with 487 additions and 67 deletions

View File

@ -13,8 +13,7 @@ namespace mvcc {
class SerializationError : public utils::BasicException {
static constexpr const char *default_message =
"Can't serialize due to\
concurrent operation(s)";
"Can't serialize due to concurrent operations";
public:
using utils::BasicException::BasicException;

41
tests/stress/bipartite.py Normal file → Executable file
View File

@ -9,9 +9,8 @@ import logging
import multiprocessing
import time
from common import parse_connection_arguments, argument_session, \
assert_equal, batch_rendered_strings, \
OutputData, execute_till_success
from common import connection_argument_parser, argument_session, assert_equal,\
OutputData, execute_till_success, batch, render
def parse_args():
@ -20,8 +19,7 @@ def parse_args():
:return: parsed arguments
'''
parser = parse_connection_arguments()
parser = connection_argument_parser()
parser.add_argument('--no-workers', type=int,
default=multiprocessing.cpu_count(),
help='Number of concurrent workers.')
@ -36,7 +34,6 @@ def parse_args():
parser.add_argument('--edge-batch-size', type=int, default=100,
help='Number of edges in a batch when edges '
'are created in batches.')
return parser.parse_args()
@ -57,23 +54,18 @@ def create_u_v_edges(u):
start_time = time.time()
with argument_session(args) as session:
no_failures = 0
match_u_query = 'MATCH (u:U {id: %s}) ' % u
match_u = 'MATCH (u:U {id: %d})' % u
if args.edge_batching:
# TODO: try to randomize execution, the execution time should
# be smaller, add randomize flag
for batchm, dps in batch_rendered_strings(
'MATCH (v%s:V {id: %s})',
[(i, i) for i in range(args.no_v)],
args.edge_batch_size):
for batchc, _ in batch_rendered_strings(
'CREATE (u)-[:R]->(v%s)',
[dpi for dpi, _ in dps],
args.edge_batch_size):
no_failures += execute_till_success(
session, match_u_query + batchm + batchc)
for v_id_batch in batch(range(args.no_v), args.edge_batch_size):
match_v = render(" MATCH (v{0}:V {{id: {0}}})", v_id_batch)
create_u = render(" CREATE (u)-[:R]->(v{0})", v_id_batch)
query = match_u + "".join(match_v) + "".join(create_u)
no_failures += execute_till_success(session, query)[1]
else:
no_failures += execute_till_success(
session, match_u_query + 'MATCH (v:V) CREATE (u)-[:R]->(v)')
session, match_u + ' MATCH (v:V) CREATE (u)-[:R]->(v)')[1]
end_time = time.time()
return u, end_time - start_time, "s", no_failures
@ -129,17 +121,14 @@ def execution_handler():
output_data.add_measurement("cleanup_time",
cleanup_end_time - start_time)
# create vertices
# create U vertices
for vertex_batch, _ in batch_rendered_strings('CREATE (:U {id: %s})',
range(args.no_u),
for b in batch(render('CREATE (:U {{id: {}}})', range(args.no_u)),
args.vertex_batch_size):
session.run(vertex_batch).consume()
session.run(" ".join(b)).consume()
# create V vertices
for vertex_batch, _ in batch_rendered_strings('CREATE (:V {id: %s})',
range(args.no_v),
for b in batch(render('CREATE (:V {{id: {}}})', range(args.no_v)),
args.vertex_batch_size):
session.run(vertex_batch).consume()
session.run(" ".join(b)).consume()
vertices_create_end_time = time.time()
output_data.add_measurement(
'vertices_create_time',
@ -214,4 +203,4 @@ if __name__ == '__main__':
output_data.add_status("edge_batching", args.edge_batching)
output_data.add_status("edge_batch_size", args.edge_batch_size)
execution_handler()
output_data.console_dump()
output_data.dump()

View File

@ -1,4 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
@ -9,6 +8,8 @@ Only Bolt communication protocol is supported.
'''
import contextlib
from threading import Thread
from time import sleep
from argparse import ArgumentParser
from neo4j.v1 import GraphDatabase, basic_auth
@ -43,59 +44,77 @@ class OutputData:
'''
self._statuses.append((name, status))
def console_dump(self):
def dump(self, print_f=print):
'''
Dumps output data on the console output.
Dumps output using the given ouput function.
Args:
print_f - the function that consumes ouptput. Defaults to
the 'print' function.
'''
print("Output data:")
print_f("Output data:")
for name, status in self._statuses:
print(" %s: %s" % (name, status))
print_f(" %s: %s" % (name, status))
for name, time, unit in self._measurements:
print(" %s: %s%s" % (name, time, unit))
print_f(" %s: %s%s" % (name, time, unit))
def execute_till_success(session, query):
def execute_till_success(session, query, max_retries=1000):
'''
Executes a query within Bolt session until the query is
successfully executed against the database.
Args:
session - the bolt session to execute the query with
query - str, the query to execute
max_retries - int, maximum allowed number of attempts
:param session: active Bolt session
:param query: query to execute
:return: int, number of failuers
:return: tuple (results_data_list, number_of_failures)
'''
no_failures = 0
try_again = True
while try_again:
while True:
try:
session.run(query).consume()
try_again = False
return session.run(query).data(), no_failures
except Exception:
no_failures += 1
return no_failures
if no_failures >= max_retries:
raise Exception("Query '%s' failed %d times, aborting" %
(query, max_retries))
def batch_rendered_strings(t, dps, bs=1):
'''
Batches rendered strings based on template and data points. Template is
populated from a single data point and than more rendered strings
are batched into a single string.
def batch(input, batch_size):
""" Batches the given input (must be iterable).
Supports input generators. Returns a generator.
All is lazy. The last batch can contain less elements
then `batch_size`, but is for sure more then zero.
:param t: str, template for the rendered string (for one data point)
:param dps, list or iterator with data points to populate the template
:param bs: int, batch size
Args:
input - iterable of elements
batch_size - number of elements in the batch
Return:
a generator that yields batches of elements.
"""
assert batch_size > 1, "Batch size must be greater then zero"
:returns: (str, batched dps (might be useful for further rendering))
e.g. if t = "test %s", dps = range(1, 6), bs = 2
yields are going to be:
"test 1 test 2", [1, 2]
"test 3 test 4", [3, 4]
"test 5" [5]
'''
no_dps = len(dps)
for ndx in range(0, no_dps, bs):
yield (' '.join([t % dp for dp in dps[ndx:min(ndx + bs, no_dps)]]),
dps[ndx:min(ndx + bs, no_dps)])
batch = []
for element in input:
batch.append(element)
if len(batch) >= batch_size:
yield batch
batch = []
if len(batch):
yield batch
def render(template, iterable_arguments):
"""
Calls template.format() for each given argument.
"""
for arguments in iterable_arguments:
yield template.format(arguments)
def assert_equal(expected, actual, message):
@ -111,7 +130,7 @@ def assert_equal(expected, actual, message):
assert expected == actual, message % (expected, actual)
def parse_connection_arguments():
def connection_argument_parser():
'''
Parses arguments related to establishing database connection like
host, port, username, etc.
@ -129,9 +148,6 @@ def parse_connection_arguments():
help='DBMS instance password.')
parser.add_argument('--ssl-enabled', action='store_false',
help="Is SSL enabled?")
parser.parse_known_args()
return parser
@ -155,7 +171,32 @@ def bolt_session(url, auth, ssl=False):
def argument_session(args):
'''
:return: Bolt session based on program arguments
:return: Bolt session context manager based on program arguments
'''
return bolt_session('bolt://' + args.endpoint,
basic_auth(args.username, args.password))
def argument_driver(args, ssl=False):
return GraphDatabase.driver(
'bolt://' + args.endpoint,
basic_auth=(args.username, args.password),
encrypted=ssl)
def periodically_execute(callable, args, interval, daemon=True):
"""
Periodically calls the given callable.
Args:
callable - the callable to call
args - arguments to pass to callable
interval - time (in seconds) between two calls
deamon - if the execution thread should be a daemon
"""
def periodic_call():
while True:
sleep(interval)
callable()
Thread(target=periodic_call, args=args, daemon=daemon).start()

391
tests/stress/long_running.py Executable file
View File

@ -0,0 +1,391 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
A long running test that performs
random CRUD ops on a bolt database.
Parameterized with vertex and edge counts around which
the graph state oscilates.
"""
import logging
import random
import time
from uuid import uuid4
from threading import Lock, Thread
from contextlib import contextmanager
from collections import defaultdict
import common
log = logging.getLogger(__name__)
def rint(upper_exclusive):
return random.randint(0, upper_exclusive - 1)
def bernoulli(p):
return random.random() < p
def random_id():
return str(uuid4())
class QueryExecutionSynchronizer():
"""
Fascilitates running a query with not other queries being
concurrently executed.
Exposes a count of how many queries in total have been
executed through `count_total`.
"""
def __init__(self, sleep_time=0.2):
"""
Args:
sleep_time - Sleep time while awaiting execution rights
"""
self.count_total = 0
self._lock = Lock()
self._count = 0
self._can_run = True
self._sleep_time = sleep_time
@contextmanager
def run(self):
"""
Provides a context for running a query without isolation.
Isolated queries can't be executed while such a context exists.
"""
while True:
with self._lock:
if self._can_run:
self._count += 1
self.count_total += 1
break
time.sleep(self._sleep_time)
try:
yield
finally:
with self._lock:
self._count -= 1
@contextmanager
def run_isolated(self):
"""
Provides a context for runnig a query with isolation. Prevents
new queries from executing. Waits till the currently executing
queries are done. Once this context exits execution can
continue.
"""
with self._lock:
self._can_run = False
while True:
with self._lock:
if self._count == 0:
break
time.sleep(self._sleep_time)
with self._lock:
try:
yield
finally:
self._can_run = True
class LabelCounter():
""" Encapsulates a label and a thread-safe counter """
def __init__(self, label):
self.label = label
self._count = 0
self._lock = Lock()
def increment(self):
with self._lock:
self._count += 1
def decrement(self):
with self._lock:
self._count -= 1
class ThreadSafeList():
""" Provides a thread-safe access to a list for a few functionalities. """
def __init__(self):
self._list = []
self._lock = Lock()
def append(self, element):
with self._lock:
self._list.append(element)
def remove(self, element):
with self._lock:
self._list.remove(element)
def random(self):
with self._lock:
return self._list[rint(len(self._list))]
def __len__(self):
with self._lock:
return len(self._list)
class Graph():
"""
Exposes functions for working on a graph, and tracks some
statistics about graph state.
"""
def __init__(self, vertex_count, edge_count, labels=5):
"""
Args:
vertex_count - int, desired vertex count
edge_count - int, desired edge count
labels - int, the number of labels to use
"""
# desired vertex and edge counts
self.vertex_count = vertex_count
self.edge_count = edge_count
self.query_execution_synchronizer = QueryExecutionSynchronizer()
# storage
self.edges = ThreadSafeList()
self.vertices = ThreadSafeList()
self.labels = {"label%d" % i: ThreadSafeList() for i in range(labels)}
# info about query failures, maps exception string representations into
# occurence counts
self._query_failure_counts = defaultdict(int)
self._query_failure_counts_lock = Lock()
def add_query_failure(self, reason):
with self._query_failure_counts_lock:
self._query_failure_counts[reason] += 1
def query_failures(self):
with self._query_failure_counts_lock:
return dict(self._query_failure_counts)
class GraphSession():
"""
Encapsulates a Graph and a Bolt session and provides CRUD op functions.
Also defines a run-loop for a generic exectutor, and a graph state
verification function.
"""
def __init__(self, graph, session):
self.graph = graph
self.session = session
self._start_time = time.time()
@property
def v(self):
return self.graph.vertices
@property
def e(self):
return self.graph.edges
def execute_basic(self, query):
log.debug("Executing query: %s", query)
try:
return self.session.run(query).data()
except Exception as e:
self.graph.add_query_failure(str(e))
return None
def execute(self, query):
with self.graph.query_execution_synchronizer.run():
return self.execute_basic(query)
def create_vertex(self):
vertex_id = random_id()
self.execute("CREATE ({id: %r})" % vertex_id)
self.v.append(vertex_id)
def remove_vertex(self):
vertex_id = self.v.random()
result = self.execute(
"MATCH (n {id: %r}) OPTIONAL MATCH (n)-[r]-() "
"DETACH DELETE n RETURN n.id, labels(n), r.id" % vertex_id)
if result:
process_vertex_ids = set()
for row in result:
# remove vertex but note there could be duplicates
vertex_id = row['n.id']
if vertex_id not in process_vertex_ids:
process_vertex_ids.add(vertex_id)
self.v.remove(vertex_id)
for label in row['labels(n)']:
self.graph.labels[label].remove(vertex_id)
# remove edge
edge_id = row['r.id']
if edge_id:
self.e.remove(edge_id)
def create_edge(self):
eid = random_id()
creation = self.execute(
"MATCH (from {id: %r}), (to {id: %r}) "
"CREATE (from)-[e:EdgeType {id: %r}]->(to) RETURN e" % (
self.v.random(), self.v.random(), eid))
if creation:
self.e.append(eid)
def remove_edge(self):
edge_id = self.e.random()
result = self.execute(
"MATCH ()-[e {id: %r}]->() DELETE e RETURN e.id" % edge_id)
if result:
self.e.remove(edge_id)
def add_label(self):
vertex_id = self.v.random()
label = random.choice(list(self.graph.labels.keys()))
# add a label on a vertex that didn't have that label
# yet (we need that for book-keeping)
result = self.execute(
"MATCH (v {id: %r}) WHERE not v:%s SET v:%s RETURN v.id" % (
vertex_id, label, label))
if result:
self.graph.labels[label].append(vertex_id)
def verify_graph(self):
""" Checks if the local info corresponds to DB state """
def test(a, b, message):
assert set(a) == set(b), message % (len(a), len(b))
def get(query, key):
return [row[key] for row in self.execute_basic(query)]
# graph state verification must be run in isolation
with self.graph.query_execution_synchronizer.run_isolated():
test(self.v._list, get("MATCH (n) RETURN n.id", "n.id"),
"Expected %d vertices, found %d")
test(self.e._list, get("MATCH ()-[r]->() RETURN r.id", "r.id"),
"Expected %d edges, found %d")
for lab, exp in self.graph.labels.items():
test(get("MATCH (n:%s) RETURN n.id" % lab, "n.id"), exp._list,
"Expected %d vertices with label '{}', found %d".format(
lab))
log.info("Graph verification success:")
log.info("\tExecuted %d queries in %.2f seconds",
self.graph.query_execution_synchronizer.count_total,
time.time() - self._start_time)
log.info("\tGraph has %d vertices and %d edges",
len(self.v), len(self.e))
for label in sorted(self.graph.labels.keys()):
log.info("\tVertices with label '%s': %d",
label, len(self.graph.labels[label]))
failures = self.graph.query_failures()
if failures:
log.info("\tQuery failed (reason: count)")
for reason, count in failures.items():
log.info("\t\t'%s': %d", reason, count)
def run_loop(self, query_count, max_time):
start_time = time.time()
for _ in range(query_count):
if (time.time() - start_time) / 60 > max_time:
break
ratio_e = len(self.e) / self.graph.edge_count
ratio_v = len(self.v) / self.graph.vertex_count
# prefer adding/removing edges whenever there is an edge
# disbalance and there is enough vertices
if ratio_v > 0.5 and abs(1 - ratio_e) > 0.2:
if bernoulli(ratio_e / 2.0):
self.remove_edge()
else:
self.create_edge()
continue
# if we are near vertex balance, we can also do updates
# instad of update / deletes
if abs(1 - ratio_v) < 0.5 and bernoulli(0.5):
self.add_label()
continue
if bernoulli(ratio_v / 2.0):
self.remove_vertex()
else:
self.create_vertex()
def parse_args():
argp = common.connection_argument_parser()
argp.add_argument("--logging", default="INFO",
choices=["INFO", "DEBUG", "WARNING", "ERROR"],
help="Logging level")
argp.add_argument("--vertex-count", type=int, required=True,
help="The average number of vertices in the graph")
argp.add_argument("--edge-count", type=int, required=True,
help="The average number of edges in the graph")
argp.add_argument("--prop-count", type=int, default=5,
help="The max number of properties on a node")
argp.add_argument("--max-queries", type=int, default=2 ** 30,
help="Maximum number of queries to execute")
argp.add_argument("--max-time", type=int, default=2 ** 30,
help="Maximum execution time in minutes")
argp.add_argument("--verify", type=int, default=0,
help="Interval (seconds) between checking local info")
argp.add_argument("--thread-count", type=int, default=1,
help="The number of threads that operate on the graph"
"independently")
return argp.parse_args()
def main():
args = parse_args()
if args.logging:
logging.basicConfig(level=args.logging)
logging.getLogger("neo4j").setLevel(logging.WARNING)
log.info("Starting Memgraph long running test")
graph = Graph(args.vertex_count, args.edge_count)
driver = common.argument_driver(args)
# cleanup
driver.session().run("MATCH (n) DETACH DELETE n").consume()
if args.verify > 0:
log.info("Creating veification session")
verififaction_session = GraphSession(graph, driver.session())
common.periodically_execute(verififaction_session.verify_graph, (),
args.verify)
# TODO better verification failure handling
threads = []
for _ in range(args.thread_count):
log.info("Creating query runner thread")
session = GraphSession(graph, driver.session())
thread = Thread(target=session.run_loop,
args=(args.max_queries // args.thread_count,
args.max_time),
daemon=True)
threads.append(thread)
list(map(Thread.start, threads))
list(map(Thread.join, threads))
driver.close()
log.info("All query runners done")
if __name__ == '__main__':
main()