diff --git a/src/mvcc/version_list.hpp b/src/mvcc/version_list.hpp index ad6103d10..06414aa2c 100644 --- a/src/mvcc/version_list.hpp +++ b/src/mvcc/version_list.hpp @@ -13,8 +13,7 @@ namespace mvcc { class SerializationError : public utils::BasicException { static constexpr const char *default_message = - "Can't serialize due to\ - concurrent operation(s)"; + "Can't serialize due to concurrent operations"; public: using utils::BasicException::BasicException; diff --git a/tests/stress/bipartite.py b/tests/stress/bipartite.py old mode 100644 new mode 100755 index d91bcc785..62ecc75b8 --- a/tests/stress/bipartite.py +++ b/tests/stress/bipartite.py @@ -9,9 +9,8 @@ import logging import multiprocessing import time -from common import parse_connection_arguments, argument_session, \ - assert_equal, batch_rendered_strings, \ - OutputData, execute_till_success +from common import connection_argument_parser, argument_session, assert_equal,\ + OutputData, execute_till_success, batch, render def parse_args(): @@ -20,8 +19,7 @@ def parse_args(): :return: parsed arguments ''' - parser = parse_connection_arguments() - + parser = connection_argument_parser() parser.add_argument('--no-workers', type=int, default=multiprocessing.cpu_count(), help='Number of concurrent workers.') @@ -36,7 +34,6 @@ def parse_args(): parser.add_argument('--edge-batch-size', type=int, default=100, help='Number of edges in a batch when edges ' 'are created in batches.') - return parser.parse_args() @@ -57,23 +54,18 @@ def create_u_v_edges(u): start_time = time.time() with argument_session(args) as session: no_failures = 0 - match_u_query = 'MATCH (u:U {id: %s}) ' % u + match_u = 'MATCH (u:U {id: %d})' % u if args.edge_batching: # TODO: try to randomize execution, the execution time should # be smaller, add randomize flag - for batchm, dps in batch_rendered_strings( - 'MATCH (v%s:V {id: %s})', - [(i, i) for i in range(args.no_v)], - args.edge_batch_size): - for batchc, _ in batch_rendered_strings( - 'CREATE (u)-[:R]->(v%s)', - [dpi for dpi, _ in dps], - args.edge_batch_size): - no_failures += execute_till_success( - session, match_u_query + batchm + batchc) + for v_id_batch in batch(range(args.no_v), args.edge_batch_size): + match_v = render(" MATCH (v{0}:V {{id: {0}}})", v_id_batch) + create_u = render(" CREATE (u)-[:R]->(v{0})", v_id_batch) + query = match_u + "".join(match_v) + "".join(create_u) + no_failures += execute_till_success(session, query)[1] else: no_failures += execute_till_success( - session, match_u_query + 'MATCH (v:V) CREATE (u)-[:R]->(v)') + session, match_u + ' MATCH (v:V) CREATE (u)-[:R]->(v)')[1] end_time = time.time() return u, end_time - start_time, "s", no_failures @@ -129,17 +121,14 @@ def execution_handler(): output_data.add_measurement("cleanup_time", cleanup_end_time - start_time) - # create vertices # create U vertices - for vertex_batch, _ in batch_rendered_strings('CREATE (:U {id: %s})', - range(args.no_u), - args.vertex_batch_size): - session.run(vertex_batch).consume() + for b in batch(render('CREATE (:U {{id: {}}})', range(args.no_u)), + args.vertex_batch_size): + session.run(" ".join(b)).consume() # create V vertices - for vertex_batch, _ in batch_rendered_strings('CREATE (:V {id: %s})', - range(args.no_v), - args.vertex_batch_size): - session.run(vertex_batch).consume() + for b in batch(render('CREATE (:V {{id: {}}})', range(args.no_v)), + args.vertex_batch_size): + session.run(" ".join(b)).consume() vertices_create_end_time = time.time() output_data.add_measurement( 'vertices_create_time', @@ -214,4 +203,4 @@ if __name__ == '__main__': output_data.add_status("edge_batching", args.edge_batching) output_data.add_status("edge_batch_size", args.edge_batch_size) execution_handler() - output_data.console_dump() + output_data.dump() diff --git a/tests/stress/common.py b/tests/stress/common.py index 42acccd37..f145211c1 100644 --- a/tests/stress/common.py +++ b/tests/stress/common.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' @@ -9,6 +8,8 @@ Only Bolt communication protocol is supported. ''' import contextlib +from threading import Thread +from time import sleep from argparse import ArgumentParser from neo4j.v1 import GraphDatabase, basic_auth @@ -43,59 +44,77 @@ class OutputData: ''' self._statuses.append((name, status)) - def console_dump(self): + def dump(self, print_f=print): ''' - Dumps output data on the console output. + Dumps output using the given ouput function. + + Args: + print_f - the function that consumes ouptput. Defaults to + the 'print' function. ''' - print("Output data:") + print_f("Output data:") for name, status in self._statuses: - print(" %s: %s" % (name, status)) + print_f(" %s: %s" % (name, status)) for name, time, unit in self._measurements: - print(" %s: %s%s" % (name, time, unit)) + print_f(" %s: %s%s" % (name, time, unit)) -def execute_till_success(session, query): +def execute_till_success(session, query, max_retries=1000): ''' Executes a query within Bolt session until the query is successfully executed against the database. + Args: + session - the bolt session to execute the query with + query - str, the query to execute + max_retries - int, maximum allowed number of attempts + :param session: active Bolt session :param query: query to execute - :return: int, number of failuers + :return: tuple (results_data_list, number_of_failures) ''' no_failures = 0 - try_again = True - while try_again: + while True: try: - session.run(query).consume() - try_again = False + return session.run(query).data(), no_failures except Exception: no_failures += 1 - return no_failures + if no_failures >= max_retries: + raise Exception("Query '%s' failed %d times, aborting" % + (query, max_retries)) -def batch_rendered_strings(t, dps, bs=1): - ''' - Batches rendered strings based on template and data points. Template is - populated from a single data point and than more rendered strings - are batched into a single string. +def batch(input, batch_size): + """ Batches the given input (must be iterable). + Supports input generators. Returns a generator. + All is lazy. The last batch can contain less elements + then `batch_size`, but is for sure more then zero. - :param t: str, template for the rendered string (for one data point) - :param dps, list or iterator with data points to populate the template - :param bs: int, batch size + Args: + input - iterable of elements + batch_size - number of elements in the batch + Return: + a generator that yields batches of elements. + """ + assert batch_size > 1, "Batch size must be greater then zero" - :returns: (str, batched dps (might be useful for further rendering)) - e.g. if t = "test %s", dps = range(1, 6), bs = 2 - yields are going to be: - "test 1 test 2", [1, 2] - "test 3 test 4", [3, 4] - "test 5" [5] - ''' - no_dps = len(dps) - for ndx in range(0, no_dps, bs): - yield (' '.join([t % dp for dp in dps[ndx:min(ndx + bs, no_dps)]]), - dps[ndx:min(ndx + bs, no_dps)]) + batch = [] + for element in input: + batch.append(element) + if len(batch) >= batch_size: + yield batch + batch = [] + if len(batch): + yield batch + + +def render(template, iterable_arguments): + """ + Calls template.format() for each given argument. + """ + for arguments in iterable_arguments: + yield template.format(arguments) def assert_equal(expected, actual, message): @@ -111,7 +130,7 @@ def assert_equal(expected, actual, message): assert expected == actual, message % (expected, actual) -def parse_connection_arguments(): +def connection_argument_parser(): ''' Parses arguments related to establishing database connection like host, port, username, etc. @@ -129,9 +148,6 @@ def parse_connection_arguments(): help='DBMS instance password.') parser.add_argument('--ssl-enabled', action='store_false', help="Is SSL enabled?") - - parser.parse_known_args() - return parser @@ -155,7 +171,32 @@ def bolt_session(url, auth, ssl=False): def argument_session(args): ''' - :return: Bolt session based on program arguments + :return: Bolt session context manager based on program arguments ''' return bolt_session('bolt://' + args.endpoint, basic_auth(args.username, args.password)) + + +def argument_driver(args, ssl=False): + return GraphDatabase.driver( + 'bolt://' + args.endpoint, + basic_auth=(args.username, args.password), + encrypted=ssl) + + +def periodically_execute(callable, args, interval, daemon=True): + """ + Periodically calls the given callable. + + Args: + callable - the callable to call + args - arguments to pass to callable + interval - time (in seconds) between two calls + deamon - if the execution thread should be a daemon + """ + def periodic_call(): + while True: + sleep(interval) + callable() + + Thread(target=periodic_call, args=args, daemon=daemon).start() diff --git a/tests/stress/long_running.py b/tests/stress/long_running.py new file mode 100755 index 000000000..932fde858 --- /dev/null +++ b/tests/stress/long_running.py @@ -0,0 +1,391 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +A long running test that performs +random CRUD ops on a bolt database. + +Parameterized with vertex and edge counts around which +the graph state oscilates. +""" + +import logging +import random +import time +from uuid import uuid4 +from threading import Lock, Thread +from contextlib import contextmanager +from collections import defaultdict + +import common + +log = logging.getLogger(__name__) + + +def rint(upper_exclusive): + return random.randint(0, upper_exclusive - 1) + + +def bernoulli(p): + return random.random() < p + + +def random_id(): + return str(uuid4()) + + +class QueryExecutionSynchronizer(): + """ + Fascilitates running a query with not other queries being + concurrently executed. + + Exposes a count of how many queries in total have been + executed through `count_total`. + """ + + def __init__(self, sleep_time=0.2): + """ + Args: + sleep_time - Sleep time while awaiting execution rights + """ + self.count_total = 0 + + self._lock = Lock() + self._count = 0 + self._can_run = True + self._sleep_time = sleep_time + + @contextmanager + def run(self): + """ + Provides a context for running a query without isolation. + Isolated queries can't be executed while such a context exists. + """ + while True: + with self._lock: + if self._can_run: + self._count += 1 + self.count_total += 1 + break + time.sleep(self._sleep_time) + + try: + yield + finally: + with self._lock: + self._count -= 1 + + @contextmanager + def run_isolated(self): + """ + Provides a context for runnig a query with isolation. Prevents + new queries from executing. Waits till the currently executing + queries are done. Once this context exits execution can + continue. + """ + with self._lock: + self._can_run = False + + while True: + with self._lock: + if self._count == 0: + break + time.sleep(self._sleep_time) + + with self._lock: + try: + yield + finally: + self._can_run = True + + +class LabelCounter(): + """ Encapsulates a label and a thread-safe counter """ + + def __init__(self, label): + self.label = label + self._count = 0 + self._lock = Lock() + + def increment(self): + with self._lock: + self._count += 1 + + def decrement(self): + with self._lock: + self._count -= 1 + + +class ThreadSafeList(): + """ Provides a thread-safe access to a list for a few functionalities. """ + + def __init__(self): + self._list = [] + self._lock = Lock() + + def append(self, element): + with self._lock: + self._list.append(element) + + def remove(self, element): + with self._lock: + self._list.remove(element) + + def random(self): + with self._lock: + return self._list[rint(len(self._list))] + + def __len__(self): + with self._lock: + return len(self._list) + + +class Graph(): + """ + Exposes functions for working on a graph, and tracks some + statistics about graph state. + """ + + def __init__(self, vertex_count, edge_count, labels=5): + """ + Args: + vertex_count - int, desired vertex count + edge_count - int, desired edge count + labels - int, the number of labels to use + """ + + # desired vertex and edge counts + self.vertex_count = vertex_count + self.edge_count = edge_count + + self.query_execution_synchronizer = QueryExecutionSynchronizer() + + # storage + self.edges = ThreadSafeList() + self.vertices = ThreadSafeList() + self.labels = {"label%d" % i: ThreadSafeList() for i in range(labels)} + + # info about query failures, maps exception string representations into + # occurence counts + self._query_failure_counts = defaultdict(int) + self._query_failure_counts_lock = Lock() + + def add_query_failure(self, reason): + with self._query_failure_counts_lock: + self._query_failure_counts[reason] += 1 + + def query_failures(self): + with self._query_failure_counts_lock: + return dict(self._query_failure_counts) + + +class GraphSession(): + """ + Encapsulates a Graph and a Bolt session and provides CRUD op functions. + Also defines a run-loop for a generic exectutor, and a graph state + verification function. + """ + + def __init__(self, graph, session): + self.graph = graph + self.session = session + self._start_time = time.time() + + @property + def v(self): + return self.graph.vertices + + @property + def e(self): + return self.graph.edges + + def execute_basic(self, query): + log.debug("Executing query: %s", query) + try: + return self.session.run(query).data() + except Exception as e: + self.graph.add_query_failure(str(e)) + return None + + def execute(self, query): + with self.graph.query_execution_synchronizer.run(): + return self.execute_basic(query) + + def create_vertex(self): + vertex_id = random_id() + self.execute("CREATE ({id: %r})" % vertex_id) + self.v.append(vertex_id) + + def remove_vertex(self): + vertex_id = self.v.random() + result = self.execute( + "MATCH (n {id: %r}) OPTIONAL MATCH (n)-[r]-() " + "DETACH DELETE n RETURN n.id, labels(n), r.id" % vertex_id) + if result: + process_vertex_ids = set() + for row in result: + # remove vertex but note there could be duplicates + vertex_id = row['n.id'] + if vertex_id not in process_vertex_ids: + process_vertex_ids.add(vertex_id) + self.v.remove(vertex_id) + for label in row['labels(n)']: + self.graph.labels[label].remove(vertex_id) + # remove edge + edge_id = row['r.id'] + if edge_id: + self.e.remove(edge_id) + + def create_edge(self): + eid = random_id() + creation = self.execute( + "MATCH (from {id: %r}), (to {id: %r}) " + "CREATE (from)-[e:EdgeType {id: %r}]->(to) RETURN e" % ( + self.v.random(), self.v.random(), eid)) + if creation: + self.e.append(eid) + + def remove_edge(self): + edge_id = self.e.random() + result = self.execute( + "MATCH ()-[e {id: %r}]->() DELETE e RETURN e.id" % edge_id) + if result: + self.e.remove(edge_id) + + def add_label(self): + vertex_id = self.v.random() + label = random.choice(list(self.graph.labels.keys())) + # add a label on a vertex that didn't have that label + # yet (we need that for book-keeping) + result = self.execute( + "MATCH (v {id: %r}) WHERE not v:%s SET v:%s RETURN v.id" % ( + vertex_id, label, label)) + if result: + self.graph.labels[label].append(vertex_id) + + def verify_graph(self): + """ Checks if the local info corresponds to DB state """ + + def test(a, b, message): + assert set(a) == set(b), message % (len(a), len(b)) + + def get(query, key): + return [row[key] for row in self.execute_basic(query)] + + # graph state verification must be run in isolation + with self.graph.query_execution_synchronizer.run_isolated(): + test(self.v._list, get("MATCH (n) RETURN n.id", "n.id"), + "Expected %d vertices, found %d") + test(self.e._list, get("MATCH ()-[r]->() RETURN r.id", "r.id"), + "Expected %d edges, found %d") + for lab, exp in self.graph.labels.items(): + test(get("MATCH (n:%s) RETURN n.id" % lab, "n.id"), exp._list, + "Expected %d vertices with label '{}', found %d".format( + lab)) + + log.info("Graph verification success:") + log.info("\tExecuted %d queries in %.2f seconds", + self.graph.query_execution_synchronizer.count_total, + time.time() - self._start_time) + log.info("\tGraph has %d vertices and %d edges", + len(self.v), len(self.e)) + for label in sorted(self.graph.labels.keys()): + log.info("\tVertices with label '%s': %d", + label, len(self.graph.labels[label])) + failures = self.graph.query_failures() + if failures: + log.info("\tQuery failed (reason: count)") + for reason, count in failures.items(): + log.info("\t\t'%s': %d", reason, count) + + def run_loop(self, query_count, max_time): + start_time = time.time() + for _ in range(query_count): + if (time.time() - start_time) / 60 > max_time: + break + + ratio_e = len(self.e) / self.graph.edge_count + ratio_v = len(self.v) / self.graph.vertex_count + + # prefer adding/removing edges whenever there is an edge + # disbalance and there is enough vertices + if ratio_v > 0.5 and abs(1 - ratio_e) > 0.2: + if bernoulli(ratio_e / 2.0): + self.remove_edge() + else: + self.create_edge() + continue + + # if we are near vertex balance, we can also do updates + # instad of update / deletes + if abs(1 - ratio_v) < 0.5 and bernoulli(0.5): + self.add_label() + continue + + if bernoulli(ratio_v / 2.0): + self.remove_vertex() + else: + self.create_vertex() + + +def parse_args(): + argp = common.connection_argument_parser() + argp.add_argument("--logging", default="INFO", + choices=["INFO", "DEBUG", "WARNING", "ERROR"], + help="Logging level") + argp.add_argument("--vertex-count", type=int, required=True, + help="The average number of vertices in the graph") + argp.add_argument("--edge-count", type=int, required=True, + help="The average number of edges in the graph") + argp.add_argument("--prop-count", type=int, default=5, + help="The max number of properties on a node") + argp.add_argument("--max-queries", type=int, default=2 ** 30, + help="Maximum number of queries to execute") + argp.add_argument("--max-time", type=int, default=2 ** 30, + help="Maximum execution time in minutes") + argp.add_argument("--verify", type=int, default=0, + help="Interval (seconds) between checking local info") + argp.add_argument("--thread-count", type=int, default=1, + help="The number of threads that operate on the graph" + "independently") + return argp.parse_args() + + +def main(): + args = parse_args() + if args.logging: + logging.basicConfig(level=args.logging) + logging.getLogger("neo4j").setLevel(logging.WARNING) + log.info("Starting Memgraph long running test") + + graph = Graph(args.vertex_count, args.edge_count) + driver = common.argument_driver(args) + + # cleanup + driver.session().run("MATCH (n) DETACH DELETE n").consume() + + if args.verify > 0: + log.info("Creating veification session") + verififaction_session = GraphSession(graph, driver.session()) + common.periodically_execute(verififaction_session.verify_graph, (), + args.verify) + # TODO better verification failure handling + + threads = [] + for _ in range(args.thread_count): + log.info("Creating query runner thread") + session = GraphSession(graph, driver.session()) + thread = Thread(target=session.run_loop, + args=(args.max_queries // args.thread_count, + args.max_time), + daemon=True) + threads.append(thread) + list(map(Thread.start, threads)) + + list(map(Thread.join, threads)) + driver.close() + log.info("All query runners done") + + +if __name__ == '__main__': + main()