From 26032098498bb2535b712702023e6fcba8018f61 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Mon, 18 Sep 2017 14:30:27 +0200 Subject: [PATCH] Replaced Python long running test with C++ version Reviewers: buda, mislav.bradac Reviewed By: mislav.bradac Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D802 --- tests/CMakeLists.txt | 3 + tests/stress/CMakeLists.txt | 38 +++ tests/stress/continuous_integration | 26 +- tests/{manual => stress}/long_running.cpp | 18 +- tests/stress/long_running.py | 339 ---------------------- tools/apollo/build_debug | 2 +- tools/apollo/build_diff | 2 +- tools/apollo/generate | 4 +- 8 files changed, 75 insertions(+), 357 deletions(-) create mode 100644 tests/stress/CMakeLists.txt rename tests/{manual => stress}/long_running.cpp (97%) delete mode 100755 tests/stress/long_running.py diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index cef99463b..d254f7665 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -30,6 +30,9 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/benchmark) # macro_benchmark test binaries add_subdirectory(${PROJECT_SOURCE_DIR}/macro_benchmark) +# stress test binaries +add_subdirectory(${PROJECT_SOURCE_DIR}/stress) + # concurrent test binaries add_subdirectory(${PROJECT_SOURCE_DIR}/concurrent) diff --git a/tests/stress/CMakeLists.txt b/tests/stress/CMakeLists.txt new file mode 100644 index 000000000..b402afebd --- /dev/null +++ b/tests/stress/CMakeLists.txt @@ -0,0 +1,38 @@ +find_package(Threads REQUIRED) + +# set current directory name as a test type +get_filename_component(test_type ${CMAKE_CURRENT_SOURCE_DIR} NAME) + +# get all cpp abs file names recursively starting from current directory +file(GLOB_RECURSE test_type_cpps *.cpp) +message(STATUS "Available ${test_type} cpp files are: ${test_type_cpps}") + +# add target that depends on all other targets +set(all_targets_target ${project_name}__${test_type}) +add_custom_target(${all_targets_target}) + +# for each cpp file build binary and register test +foreach(test_cpp ${test_type_cpps}) + + # get exec name (remove extension from the abs path) + get_filename_component(exec_name ${test_cpp} NAME_WE) + + # set target name in format {project_name}__{test_type}__{exec_name} + set(target_name ${project_name}__${test_type}__${exec_name}) + + # build exec file + add_executable(${target_name} ${test_cpp}) + set_property(TARGET ${target_name} PROPERTY CXX_STANDARD ${cxx_standard}) + + # OUTPUT_NAME sets the real name of a target when it is built and can be + # used to help create two targets of the same name even though CMake + # requires unique logical target names + set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) + + # link libraries + target_link_libraries(${target_name} memgraph_lib) + + # add target to dependencies + add_dependencies(${all_targets_target} ${target_name}) + +endforeach() diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index 2ce170c19..bbbe63446 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -27,12 +27,12 @@ SMALL_DATASET = [ "timeout": 5, }, { - "test": "long_running.py", + "test": "long_running.cpp", "options": ["--vertex-count", "1000", "--edge-count", "1000", "--max-time", "1", "--verify", "20"], "timeout": 5, }, { - "test": "long_running.py", + "test": "long_running.cpp", "options": ["--vertex-count", "1000", "--edge-count", "1000", "--max-time", "2", "--verify", "30"], "timeout": 5, }, @@ -55,13 +55,13 @@ LARGE_DATASET = [ }, ] + [ { - "test": "long_running.py", + "test": "long_running.cpp", "options": ["--vertex-count", "100000", "--edge-count", "100000", "--max-time", "5", "--verify", "60"], "timeout": 8, }, ] * 6 + [ { - "test": "long_running.py", + "test": "long_running.cpp", "options": ["--vertex-count", "200000", "--edge-count", "2000000", "--max-time", "480", "--verify", "300"], "timeout": 500, }, @@ -84,10 +84,22 @@ else: def run_test(args, test, options, timeout): print("Running test '{}'".format(test)) + # find binary + if test.endswith(".py"): + logging = "DEBUG" if args.verbose else "WARNING" + binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), + "--logging", logging] + elif test.endswith(".cpp"): + exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4]) + if not os.path.exists(exe): + exe = os.path.join(BASE_DIR, "build_release", "tests", "stress", + test[:-4]) + binary = [exe] + else: + raise Exception("Test '{}' binary not supported!".format(test)) + # start test - logging = "DEBUG" if args.verbose else "WARNING" - cmd = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--worker-count", - str(THREADS), "--logging", logging] + options + cmd = binary + ["--worker-count", str(THREADS)] + options start = time.time() ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60) diff --git a/tests/manual/long_running.cpp b/tests/stress/long_running.cpp similarity index 97% rename from tests/manual/long_running.cpp rename to tests/stress/long_running.cpp index 6d86455c8..3b942cedb 100644 --- a/tests/manual/long_running.cpp +++ b/tests/stress/long_running.cpp @@ -22,14 +22,14 @@ DEFINE_string(password, "", "Password for the database"); DEFINE_int32(vertex_count, 0, "The average number of vertices in the graph"); DEFINE_int32(edge_count, 0, "The average number of edges in the graph"); -DEFINE_int32(vertex_batch, 200, - "The number of vertices to be created simultaneously"); DEFINE_int32(prop_count, 5, "The max number of properties on a node"); DEFINE_uint64(max_queries, 1 << 30, "Maximum number of queries to execute"); DEFINE_int32(max_time, 1, "Maximum execution time in minutes"); DEFINE_int32(verify, 0, "Interval (seconds) between checking local info"); DEFINE_int32(worker_count, 1, "The number of workers that operate on the graph independently"); +DEFINE_bool(global_queries, false, + "If queries that modifiy globally should be executed sometimes"); /** * Encapsulates a Graph and a Bolt session and provides CRUD op functions. @@ -309,13 +309,15 @@ class GraphSession { double ratio_v = (double)vertices_.size() / (double)vertex_count; // try to edit vertices globally - if (Bernoulli(0.01)) { - UpdateGlobalVertices(); - } + if (FLAGS_global_queries) { + if (Bernoulli(0.01)) { + UpdateGlobalVertices(); + } - // try to edit edges globally - if (Bernoulli(0.01)) { - UpdateGlobalEdges(); + // try to edit edges globally + if (Bernoulli(0.01)) { + UpdateGlobalEdges(); + } } // prefer adding/removing edges whenever there is an edge diff --git a/tests/stress/long_running.py b/tests/stress/long_running.py deleted file mode 100755 index 590f1c7e9..000000000 --- a/tests/stress/long_running.py +++ /dev/null @@ -1,339 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -A long running test that performs -random CRUD ops on a bolt database. - -Parameterized with vertex and edge counts around which -the graph state oscilates. -""" - -import logging -import multiprocessing -import neo4j.exceptions -import random -import time -from collections import defaultdict - -import common - - -log = logging.getLogger(__name__) - - -INDEX_FORMAT = "indexed_label{}" - - -def random_element(lst): - return lst[random.randint(0, len(lst) - 1)] - - -def bernoulli(p): - return random.random() < p - - -class Graph(): - """ - Exposes functions for working on a graph, and tracks some - statistics about graph state. - """ - - def __init__(self, vertex_count, edge_count, labels=5): - """ - Args: - vertex_count - int, desired vertex count - edge_count - int, desired edge count - labels - int, the number of labels to use - """ - - # desired vertex and edge counts - self.vertex_count = vertex_count - self.edge_count = edge_count - - # storage - self.edges = [] - self.vertices = [] - self.labels = {"label%d" % i: [] for i in range(labels)} - - # info about query failures, maps exception string representations into - # occurence counts - self._query_failure_counts = defaultdict(int) - - def add_query_failure(self, reason): - self._query_failure_counts[reason] += 1 - - def query_failures(self): - return dict(self._query_failure_counts) - - -class GraphSession(): - """ - Encapsulates a Graph and a Bolt session and provides CRUD op functions. - Also defines a run-loop for a generic exectutor, and a graph state - verification function. - """ - - def __init__(self, sid, graph, session): - self.sid = sid - - # the label in the database that is indexed - # used for matching vertices faster - self.indexed_label = INDEX_FORMAT.format(sid) - - self.vertex_id = 1 - self.edge_id = 1 - - self.graph = graph - self.session = session - self.executed_queries = 0 - self._start_time = time.time() - - @property - def v(self): - return self.graph.vertices - - @property - def e(self): - return self.graph.edges - - def execute(self, query): - log.debug("Runner %d executing query: %s", self.sid, query) - self.executed_queries += 1 - try: - return self.session.run(query).data() - except neo4j.exceptions.ServiceUnavailable as e: - raise e - except Exception as e: - self.graph.add_query_failure(str(e)) - return None - - def create_vertices(self, vertices_count): - query = "" - if vertices_count == 0: return - for _ in range(vertices_count): - query += "CREATE (:%s {id: %r}) " % (self.indexed_label, - self.vertex_id) - self.v.append(self.vertex_id) - self.vertex_id += 1 - self.execute(query) - - def remove_vertex(self): - vertex_id = random_element(self.v) - result = self.execute( - "MATCH (n:%s {id: %r}) OPTIONAL MATCH (n)-[r]-() " - "DETACH DELETE n RETURN n.id, labels(n), r.id" % - (self.indexed_label, vertex_id)) - if result: - process_vertex_ids = set() - for row in result: - # remove vertex but note there could be duplicates - vertex_id = row['n.id'] - if vertex_id not in process_vertex_ids: - process_vertex_ids.add(vertex_id) - self.v.remove(vertex_id) - for label in row['labels(n)']: - if (label != self.indexed_label): - self.graph.labels[label].remove(vertex_id) - # remove edge - edge_id = row['r.id'] - if edge_id != None: - self.e.remove(edge_id) - - def create_edge(self): - creation = self.execute( - "MATCH (from:%s {id: %r}), (to:%s {id: %r}) " - "CREATE (from)-[e:EdgeType {id: %r}]->(to) RETURN e" % ( - self.indexed_label, random_element(self.v), self.indexed_label, - random_element(self.v), self.edge_id)) - if creation: - self.e.append(self.edge_id) - self.edge_id += 1 - - def remove_edge(self): - edge_id = random_element(self.e) - result = self.execute("MATCH (:%s)-[e {id: %r}]->(:%s) DELETE e " - "RETURN e.id" % (self.indexed_label, edge_id, - self.indexed_label)) - if result: - self.e.remove(edge_id) - - def add_label(self): - vertex_id = random_element(self.v) - label = random.choice(list(self.graph.labels.keys())) - # add a label on a vertex that didn't have that label - # yet (we need that for book-keeping) - result = self.execute("MATCH (v:%s {id: %r}) WHERE not v:%s SET v:%s " - "RETURN v.id" % (self.indexed_label, vertex_id, - label, label)) - if result: - self.graph.labels[label].append(vertex_id) - - def update_global_vertices(self): - lo = random.randint(0, self.vertex_id) - hi = lo + int(self.vertex_id * 0.01) - num = random.randint(0, 2 ** 20) - self.execute("MATCH (n) WHERE n.id > %d AND n.id < %d " - "SET n.value = %d" % (lo, hi, num)) - - def update_global_edges(self): - lo = random.randint(0, self.edge_id) - hi = lo + int(self.edge_id * 0.01) - num = random.randint(0, 2 ** 20) - self.execute("MATCH ()-[e]->() WHERE e.id > %d AND e.id < %d " - "SET e.value = %d" % (lo, hi, num)) - - def verify_graph(self): - """ Checks if the local info corresponds to DB state """ - - def test(obj, length, message): - assert len(obj) == length, message % (len(obj), length) - - def get(query, key): - ret = self.execute(query) - assert ret != None, "Query '{}' returned 'None'!".format(query) - return [row[key] for row in ret] - - test(self.v, get("MATCH (n:{}) RETURN count(n)".format( - self.indexed_label), "count(n)")[0], - "Expected %d vertices, found %d") - test(self.e, get("MATCH (:{0})-[r]->(:{0}) RETURN count(r)".format( - self.indexed_label), "count(r)")[0], - "Expected %d edges, found %d") - for lab, exp in self.graph.labels.items(): - test(exp, get("MATCH (n:%s:%s) RETURN count(n)" % ( - self.indexed_label, lab), "count(n)")[0], - "Expected %d vertices with label '{}', found %d".format( - lab)) - - log.info("Runner %d graph verification success:", self.sid) - log.info("\tExecuted %d queries in %.2f seconds", - self.executed_queries, time.time() - self._start_time) - log.info("\tGraph has %d vertices and %d edges", - len(self.v), len(self.e)) - for label in sorted(self.graph.labels.keys()): - log.info("\tVertices with label '%s': %d", - label, len(self.graph.labels[label])) - failures = self.graph.query_failures() - if failures: - log.info("\tQuery failed (reason: count)") - for reason, count in failures.items(): - log.info("\t\t'%s': %d", reason, count) - - def run_loop(self, vertex_batch, query_count, max_time, verify): - # start the test - start_time = last_verify = time.time() - - # initial batched vertex creation - for _ in range(self.graph.vertex_count // vertex_batch): - if (time.time() - start_time) / 60 > max_time \ - or self.executed_queries > query_count: - break - self.create_vertices(vertex_batch) - self.create_vertices(self.graph.vertex_count % vertex_batch) - - # run rest - while self.executed_queries < query_count: - now_time = time.time() - if (now_time - start_time) / 60 > max_time: - break - - if verify > 0 and (now_time - last_verify) > verify: - self.verify_graph() - last_verify = now_time - - ratio_e = len(self.e) / self.graph.edge_count - ratio_v = len(self.v) / self.graph.vertex_count - - # try to edit vertices globally - if bernoulli(0.01): - self.update_global_vertices() - - # try to edit edges globally - if bernoulli(0.01): - self.update_global_edges() - - # prefer adding/removing edges whenever there is an edge - # disbalance and there is enough vertices - if ratio_v > 0.5 and abs(1 - ratio_e) > 0.2: - if bernoulli(ratio_e / 2.0): - self.remove_edge() - else: - self.create_edge() - continue - - # if we are near vertex balance, we can also do updates - # instad of update / deletes - if abs(1 - ratio_v) < 0.5 and bernoulli(0.5): - self.add_label() - continue - - if bernoulli(ratio_v / 2.0): - self.remove_vertex() - else: - self.create_vertices(1) - - -def runner(params): - num, args = params - driver = common.argument_driver(args) - graph = Graph(args.vertex_count // args.worker_count, - args.edge_count // args.worker_count) - log.info("Starting query runner process") - session = GraphSession(num, graph, driver.session()) - session.run_loop(args.vertex_batch, args.max_queries // args.worker_count, - args.max_time, args.verify) - log.info("Runner %d executed %d queries", num, session.executed_queries) - driver.close() - - -def parse_args(): - argp = common.connection_argument_parser() - argp.add_argument("--logging", default="INFO", - choices=["INFO", "DEBUG", "WARNING", "ERROR"], - help="Logging level") - argp.add_argument("--vertex-count", type=int, required=True, - help="The average number of vertices in the graph") - argp.add_argument("--edge-count", type=int, required=True, - help="The average number of edges in the graph") - argp.add_argument("--vertex-batch", type=int, default=200, - help="The number of vertices to be created " - "simultaneously") - argp.add_argument("--prop-count", type=int, default=5, - help="The max number of properties on a node") - argp.add_argument("--max-queries", type=int, default=2 ** 30, - help="Maximum number of queries to execute") - argp.add_argument("--max-time", type=int, default=2 ** 30, - help="Maximum execution time in minutes") - argp.add_argument("--verify", type=int, default=0, - help="Interval (seconds) between checking local info") - argp.add_argument("--worker-count", type=int, default=1, - help="The number of workers that operate on the graph " - "independently") - return argp.parse_args() - - -def main(): - args = parse_args() - if args.logging: - logging.basicConfig(level=args.logging) - logging.getLogger("neo4j").setLevel(logging.WARNING) - log.info("Starting Memgraph long running test") - - # cleanup and create indexes - driver = common.argument_driver(args) - driver.session().run("MATCH (n) DETACH DELETE n").consume() - for i in range(args.worker_count): - label = INDEX_FORMAT.format(i) - driver.session().run("CREATE INDEX ON :%s(id)" % label).consume() - driver.close() - - params = [(i, args) for i in range(args.worker_count)] - with multiprocessing.Pool(args.worker_count) as p: - p.map(runner, params, 1) - - log.info("All query runners done") - - -if __name__ == '__main__': - main() diff --git a/tools/apollo/build_debug b/tools/apollo/build_debug index 0e9dfaf0f..2a9d910cd 100644 --- a/tools/apollo/build_debug +++ b/tools/apollo/build_debug @@ -17,7 +17,7 @@ mkdir build_release cd build_release cmake -DCMAKE_BUILD_TYPE=release .. -TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark +TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark memgraph__stress cd ../tools/apollo diff --git a/tools/apollo/build_diff b/tools/apollo/build_diff index 825ef85d4..1bafc273a 100644 --- a/tools/apollo/build_diff +++ b/tools/apollo/build_diff @@ -21,7 +21,7 @@ mkdir build_release cd build_release cmake -DCMAKE_BUILD_TYPE=release .. -TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark +TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark memgraph__stress cd ../../parent diff --git a/tools/apollo/generate b/tools/apollo/generate index c9ba4f0d9..e483a0a13 100755 --- a/tools/apollo/generate +++ b/tools/apollo/generate @@ -246,8 +246,10 @@ if mode == "diff": # stress tests stress_path = os.path.join(BASE_DIR, "tests", "stress") +stress_binary_path = os.path.join(BUILD_RELEASE_DIR, "tests", "stress") infile = create_archive("stress", [binary_release_path, - binary_release_link_path, stress_path, config_path], + binary_release_link_path, stress_path, stress_binary_path, + config_path], cwd = WORKSPACE_DIR) cmd = "cd memgraph/tests/stress\nTIMEOUT=600 ./continuous_integration" RUNS.append(generate_run("stress", commands = cmd, infile = infile))