memgraph/tests/stress/long_running.py

398 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
A long running test that performs
random CRUD ops on a bolt database.
Parameterized with vertex and edge counts around which
the graph state oscilates.
"""
import logging
import random
import time
from uuid import uuid4
from threading import Lock, Thread
from contextlib import contextmanager
from collections import defaultdict
import common
log = logging.getLogger(__name__)
# the label in the database that is indexed
# used for matching vertices faster
INDEXED_LABEL = "indexed_label"
def rint(upper_exclusive):
return random.randint(0, upper_exclusive - 1)
def bernoulli(p):
return random.random() < p
def random_id():
return str(uuid4())
class QueryExecutionSynchronizer():
"""
Fascilitates running a query with not other queries being
concurrently executed.
Exposes a count of how many queries in total have been
executed through `count_total`.
"""
def __init__(self, sleep_time=0.2):
"""
Args:
sleep_time - Sleep time while awaiting execution rights
"""
self.count_total = 0
self._lock = Lock()
self._count = 0
self._can_run = True
self._sleep_time = sleep_time
@contextmanager
def run(self):
"""
Provides a context for running a query without isolation.
Isolated queries can't be executed while such a context exists.
"""
while True:
with self._lock:
if self._can_run:
self._count += 1
self.count_total += 1
break
time.sleep(self._sleep_time)
try:
yield
finally:
with self._lock:
self._count -= 1
@contextmanager
def run_isolated(self):
"""
Provides a context for runnig a query with isolation. Prevents
new queries from executing. Waits till the currently executing
queries are done. Once this context exits execution can
continue.
"""
with self._lock:
self._can_run = False
while True:
with self._lock:
if self._count == 0:
break
time.sleep(self._sleep_time)
with self._lock:
try:
yield
finally:
self._can_run = True
class LabelCounter():
""" Encapsulates a label and a thread-safe counter """
def __init__(self, label):
self.label = label
self._count = 0
self._lock = Lock()
def increment(self):
with self._lock:
self._count += 1
def decrement(self):
with self._lock:
self._count -= 1
class ThreadSafeList():
""" Provides a thread-safe access to a list for a few functionalities. """
def __init__(self):
self._list = []
self._lock = Lock()
def append(self, element):
with self._lock:
self._list.append(element)
def remove(self, element):
with self._lock:
self._list.remove(element)
def random(self):
with self._lock:
return self._list[rint(len(self._list))]
def __len__(self):
with self._lock:
return len(self._list)
class Graph():
"""
Exposes functions for working on a graph, and tracks some
statistics about graph state.
"""
def __init__(self, vertex_count, edge_count, labels=5):
"""
Args:
vertex_count - int, desired vertex count
edge_count - int, desired edge count
labels - int, the number of labels to use
"""
# desired vertex and edge counts
self.vertex_count = vertex_count
self.edge_count = edge_count
self.query_execution_synchronizer = QueryExecutionSynchronizer()
# storage
self.edges = ThreadSafeList()
self.vertices = ThreadSafeList()
self.labels = {"label%d" % i: ThreadSafeList() for i in range(labels)}
# info about query failures, maps exception string representations into
# occurence counts
self._query_failure_counts = defaultdict(int)
self._query_failure_counts_lock = Lock()
def add_query_failure(self, reason):
with self._query_failure_counts_lock:
self._query_failure_counts[reason] += 1
def query_failures(self):
with self._query_failure_counts_lock:
return dict(self._query_failure_counts)
class GraphSession():
"""
Encapsulates a Graph and a Bolt session and provides CRUD op functions.
Also defines a run-loop for a generic exectutor, and a graph state
verification function.
"""
def __init__(self, graph, session):
self.graph = graph
self.session = session
self._start_time = time.time()
@property
def v(self):
return self.graph.vertices
@property
def e(self):
return self.graph.edges
def execute_basic(self, query):
log.debug("Executing query: %s", query)
try:
return self.session.run(query).data()
except Exception as e:
self.graph.add_query_failure(str(e))
return None
def execute(self, query):
with self.graph.query_execution_synchronizer.run():
return self.execute_basic(query)
def create_vertex(self):
vertex_id = random_id()
self.execute("CREATE (:%s {id: %r})" % (INDEXED_LABEL, vertex_id))
self.v.append(vertex_id)
def remove_vertex(self):
vertex_id = self.v.random()
result = self.execute(
"MATCH (n:%s {id: %r}) OPTIONAL MATCH (n)-[r]-() "
"DETACH DELETE n RETURN n.id, labels(n), r.id" % (INDEXED_LABEL, vertex_id))
if result:
process_vertex_ids = set()
for row in result:
# remove vertex but note there could be duplicates
vertex_id = row['n.id']
if vertex_id not in process_vertex_ids:
process_vertex_ids.add(vertex_id)
self.v.remove(vertex_id)
for label in row['labels(n)']:
if (label != INDEXED_LABEL):
self.graph.labels[label].remove(vertex_id)
# remove edge
edge_id = row['r.id']
if edge_id:
self.e.remove(edge_id)
def create_edge(self):
eid = random_id()
creation = self.execute(
"MATCH (from:%s {id: %r}), (to:%s {id: %r}) "
"CREATE (from)-[e:EdgeType {id: %r}]->(to) RETURN e" % (
INDEXED_LABEL, self.v.random(), INDEXED_LABEL, self.v.random(), eid))
if creation:
self.e.append(eid)
def remove_edge(self):
edge_id = self.e.random()
result = self.execute(
"MATCH ()-[e {id: %r}]->() DELETE e RETURN e.id" % edge_id)
if result:
self.e.remove(edge_id)
def add_label(self):
vertex_id = self.v.random()
label = random.choice(list(self.graph.labels.keys()))
# add a label on a vertex that didn't have that label
# yet (we need that for book-keeping)
result = self.execute(
"MATCH (v {id: %r}) WHERE not v:%s SET v:%s RETURN v.id" % (
vertex_id, label, label))
if result:
self.graph.labels[label].append(vertex_id)
def verify_graph(self):
""" Checks if the local info corresponds to DB state """
def test(a, b, message):
assert set(a) == set(b), message % (len(a), len(b))
def get(query, key):
return [row[key] for row in self.execute_basic(query)]
# graph state verification must be run in isolation
with self.graph.query_execution_synchronizer.run_isolated():
test(self.v._list, get("MATCH (n) RETURN n.id", "n.id"),
"Expected %d vertices, found %d")
test(self.e._list, get("MATCH ()-[r]->() RETURN r.id", "r.id"),
"Expected %d edges, found %d")
for lab, exp in self.graph.labels.items():
test(get("MATCH (n:%s) RETURN n.id" % lab, "n.id"), exp._list,
"Expected %d vertices with label '{}', found %d".format(
lab))
log.info("Graph verification success:")
log.info("\tExecuted %d queries in %.2f seconds",
self.graph.query_execution_synchronizer.count_total,
time.time() - self._start_time)
log.info("\tGraph has %d vertices and %d edges",
len(self.v), len(self.e))
for label in sorted(self.graph.labels.keys()):
log.info("\tVertices with label '%s': %d",
label, len(self.graph.labels[label]))
failures = self.graph.query_failures()
if failures:
log.info("\tQuery failed (reason: count)")
for reason, count in failures.items():
log.info("\t\t'%s': %d", reason, count)
def run_loop(self, query_count, max_time):
start_time = time.time()
for _ in range(query_count):
if (time.time() - start_time) / 60 > max_time:
break
ratio_e = len(self.e) / self.graph.edge_count
ratio_v = len(self.v) / self.graph.vertex_count
# prefer adding/removing edges whenever there is an edge
# disbalance and there is enough vertices
if ratio_v > 0.5 and abs(1 - ratio_e) > 0.2:
if bernoulli(ratio_e / 2.0):
self.remove_edge()
else:
self.create_edge()
continue
# if we are near vertex balance, we can also do updates
# instad of update / deletes
if abs(1 - ratio_v) < 0.5 and bernoulli(0.5):
self.add_label()
continue
if bernoulli(ratio_v / 2.0):
self.remove_vertex()
else:
self.create_vertex()
def parse_args():
argp = common.connection_argument_parser()
argp.add_argument("--logging", default="INFO",
choices=["INFO", "DEBUG", "WARNING", "ERROR"],
help="Logging level")
argp.add_argument("--vertex-count", type=int, required=True,
help="The average number of vertices in the graph")
argp.add_argument("--edge-count", type=int, required=True,
help="The average number of edges in the graph")
argp.add_argument("--prop-count", type=int, default=5,
help="The max number of properties on a node")
argp.add_argument("--max-queries", type=int, default=2 ** 30,
help="Maximum number of queries to execute")
argp.add_argument("--max-time", type=int, default=2 ** 30,
help="Maximum execution time in minutes")
argp.add_argument("--verify", type=int, default=0,
help="Interval (seconds) between checking local info")
argp.add_argument("--thread-count", type=int, default=1,
help="The number of threads that operate on the graph"
"independently")
return argp.parse_args()
def main():
args = parse_args()
if args.logging:
logging.basicConfig(level=args.logging)
logging.getLogger("neo4j").setLevel(logging.WARNING)
log.info("Starting Memgraph long running test")
graph = Graph(args.vertex_count, args.edge_count)
driver = common.argument_driver(args)
# cleanup
driver.session().run("MATCH (n) DETACH DELETE n").consume()
driver.session().run("CREATE INDEX ON :%s(id)" % INDEXED_LABEL).consume()
if args.verify > 0:
log.info("Creating veification session")
verififaction_session = GraphSession(graph, driver.session())
common.periodically_execute(verififaction_session.verify_graph, (),
args.verify)
# TODO better verification failure handling
threads = []
for _ in range(args.thread_count):
log.info("Creating query runner thread")
session = GraphSession(graph, driver.session())
thread = Thread(target=session.run_loop,
args=(args.max_queries // args.thread_count,
args.max_time),
daemon=True)
threads.append(thread)
list(map(Thread.start, threads))
list(map(Thread.join, threads))
driver.close()
log.info("All query runners done")
if __name__ == '__main__':
main()