diff --git a/src/communication/worker.hpp b/src/communication/worker.hpp index 5c101368b..6f09cf4f6 100644 --- a/src/communication/worker.hpp +++ b/src/communication/worker.hpp @@ -70,7 +70,7 @@ class Worker } void OnClose(Session &session) { - std::cout << fmt::format("Client {}:{} closed the connection.", + LOG(INFO) << fmt::format("Client {}:{} closed the connection.", session.socket_.endpoint().address(), session.socket_.endpoint().port()) << std::endl; diff --git a/src/data_structures/list/lockfree_list.hpp b/src/data_structures/list/lockfree_list.hpp index 57f6a35d1..3c66ddfb2 100644 --- a/src/data_structures/list/lockfree_list.hpp +++ b/src/data_structures/list/lockfree_list.hpp @@ -160,7 +160,6 @@ class List : Lockable { // garbage collector traverses this list and therefore it will become // deletable if (it.prev == nullptr) { - std::cout << "prev null" << std::endl; return false; } diff --git a/src/io/network/stream_reader.hpp b/src/io/network/stream_reader.hpp index 206bbe982..077cebc6a 100644 --- a/src/io/network/stream_reader.hpp +++ b/src/io/network/stream_reader.hpp @@ -22,9 +22,6 @@ class StreamReader : public StreamListener { Socket s; if (!socket.Accept(&s)) return false; - std::cout << fmt::format("Client {}:{} connected.", s.endpoint().address(), - s.endpoint().port()) - << std::endl; DLOG(INFO) << fmt::format( "Accepted a connection: socket {}, address '{}', family {}, port {}", s.id(), s.endpoint().address(), s.endpoint().family(), diff --git a/tests/stress/bipartite.py b/tests/stress/bipartite.py index 70de53c11..e03609f60 100755 --- a/tests/stress/bipartite.py +++ b/tests/stress/bipartite.py @@ -20,9 +20,12 @@ def parse_args(): :return: parsed arguments ''' parser = connection_argument_parser() - parser.add_argument('--thread-count', type=int, + parser.add_argument('--worker-count', type=int, default=multiprocessing.cpu_count(), help='Number of concurrent workers.') + parser.add_argument("--logging", default="INFO", + choices=["INFO", "DEBUG", "WARNING", "ERROR"], + help="Logging level") parser.add_argument('--u-count', type=int, default=100, help='Size of U set in the bipartite graph.') parser.add_argument('--v-count', type=int, default=100, @@ -135,7 +138,7 @@ def execution_handler(): vertices_create_end_time - cleanup_end_time) # concurrent create execution & tests - with multiprocessing.Pool(args.thread_count) as p: + with multiprocessing.Pool(args.worker_count) as p: create_edges_start_time = time.time() for worker_id, create_time, time_unit, no_failures in \ p.map(create_u_v_edges, [i for i in range(args.u_count)]): @@ -195,7 +198,9 @@ def execution_handler(): if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=args.logging) + if args.logging != "DEBUG": + logging.getLogger("neo4j").setLevel(logging.WARNING) output_data.add_status("stress_test_name", "bipartite") output_data.add_status("number_of_vertices", args.u_count + args.v_count) output_data.add_status("number_of_edges", args.u_count * args.v_count) @@ -203,4 +208,5 @@ if __name__ == '__main__': output_data.add_status("edge_batching", args.edge_batching) output_data.add_status("edge_batch_size", args.edge_batch_size) execution_handler() - output_data.dump() + if args.logging in ["DEBUG", "INFO"]: + output_data.dump() diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index 97d64cde9..13ac405a9 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -2,43 +2,68 @@ # -*- coding: utf-8 -*- import argparse +import atexit import json import multiprocessing import os import subprocess import sys +import time -# dataset calibrated for running on Apollo (approx. 1 min per test) +# dataset calibrated for running on Apollo (total 4min) +# bipartite runs for approx. 30s +# create_match runs for approx. 30s +# long_running runs for 1min +# long_running runs for 2min SMALL_DATASET = [ { "test": "bipartite.py", "options": ["--u-count", "100", "--v-count", "100"], + "timeout": 5, }, { "test": "create_match.py", "options": ["--vertex-count", "40000", "--create-pack-size", "100"], + "timeout": 5, }, { "test": "long_running.py", - "options": ["--vertex-count", "1000", "--edge-count", "1000", "--max-time", "2"], + "options": ["--vertex-count", "1000", "--edge-count", "1000", "--max-time", "1", "--verify", "20"], + "timeout": 5, + }, + { + "test": "long_running.py", + "options": ["--vertex-count", "1000", "--edge-count", "1000", "--max-time", "2", "--verify", "30"], + "timeout": 5, }, ] -# dataset calibrated for running on daily stress instance +# dataset calibrated for running on daily stress instance (total 9h) # bipartite and create_match run for approx. 15min -# long_running runs for approx. 8h +# long_running runs for 5min x 6 times = 30min +# long_running runs for 8h LARGE_DATASET = [ { "test": "bipartite.py", "options": ["--u-count", "300", "--v-count", "300"], + "timeout": 30, }, { "test": "create_match.py", "options": ["--vertex-count", "500000", "--create-pack-size", "500"], + "timeout": 30, }, +] + [ { "test": "long_running.py", - "options": ["--vertex-count", "100000", "--edge-count", "100000", "--max-time", "480"], + "options": ["--vertex-count", "100000", "--edge-count", "100000", "--max-time", "5", "--verify", "60"], + "timeout": 8, + }, +] * 6 + [ + { + "test": "long_running.py", + "options": ["--vertex-count", "200000", "--edge-count", "2000000", "--max-time", "480", "--verify", "300"], + "timeout": 500, }, ] @@ -48,39 +73,31 @@ BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) BUILD_DIR = os.path.join(BASE_DIR, "build") CONFIG_DIR = os.path.join(BASE_DIR, "config") -def run_test(args, test, options): +# get number of threads +if "THREADS" in os.environ: + THREADS = os.environ["THREADS"] +else: + THREADS = multiprocessing.cpu_count() + + +# run test helper function +def run_test(args, test, options, timeout): print("Running test '{}'".format(test)) - # get number of threads - if "THREADS" in os.environ: - threads = os.environ["THREADS"] - else: - threads = multiprocessing.cpu_count() - - # start memgraph - cwd = os.path.dirname(args.memgraph) - cmd = [args.memgraph, "--num-workers=" + str(threads)] - stdout = open("/dev/null", "w") if not args.verbose else None - proc_mg = subprocess.Popen(cmd, cwd = cwd, stdout = stdout, - env = {"MEMGRAPH_CONFIG": args.config}) - # start test - cmd = [args.python, os.path.join(SCRIPT_DIR, test), "--thread-count", - str(threads)] + options - stderr = open("/dev/null", "w") if not args.verbose else None - ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, stderr = stderr) + logging = "DEBUG" if args.verbose else "WARNING" + cmd = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--worker-count", + str(THREADS), "--logging", logging] + options + start = time.time() + ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60) - # stop memgraph - proc_mg.terminate() - ret_mg = proc_mg.wait() - - if ret_mg != 0: - raise Exception("Memgraph binary returned non-zero ({})!".format( - ret_mg)) if ret_test.returncode != 0: raise Exception("Test '{}' binary returned non-zero ({})!".format( test, ret_test.returncode)) + runtime = time.time() - start + print(" Done after {:.3f} seconds".format(runtime)) + # parse arguments parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.") @@ -88,6 +105,8 @@ parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR, "memgraph")) parser.add_argument("--config", default = os.path.join(CONFIG_DIR, "stress.conf")) +parser.add_argument("--log-file", default = "") +parser.add_argument("--snapshot-directory", default = "") parser.add_argument("--python", default = os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type = str) parser.add_argument("--large-dataset", action = "store_const", @@ -96,8 +115,36 @@ parser.add_argument("--verbose", action = "store_const", const = True, default = False) args = parser.parse_args() +# start memgraph +cwd = os.path.dirname(args.memgraph) +cmd = [args.memgraph, "--num-workers=" + str(THREADS)] +if not args.verbose: + cmd += ["--minloglevel", "1"] +if args.log_file: + cmd += ["--log-file", args.log_file] +if args.snapshot_directory: + cmd += ["--snapshot-directory", args.snapshot_directory] +proc_mg = subprocess.Popen(cmd, cwd = cwd, + env = {"MEMGRAPH_CONFIG": args.config}) +time.sleep(1.0) + +# at exit cleanup +@atexit.register +def cleanup(): + global proc_mg + if proc_mg.poll() != None: return + proc_mg.kill() + proc_mg.wait() + # run tests dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET for test in dataset: run_test(args, **test) + +# stop memgraph +proc_mg.terminate() +ret_mg = proc_mg.wait() +if ret_mg != 0: + raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg)) + print("Done!") diff --git a/tests/stress/create_match.py b/tests/stress/create_match.py index 68b31e69f..38f2cb6b1 100755 --- a/tests/stress/create_match.py +++ b/tests/stress/create_match.py @@ -25,9 +25,12 @@ def parse_args(): parser = connection_argument_parser() # specific - parser.add_argument('--thread-count', type=int, + parser.add_argument('--worker-count', type=int, default=multiprocessing.cpu_count(), help='Number of concurrent workers.') + parser.add_argument("--logging", default="INFO", + choices=["INFO", "DEBUG", "WARNING", "ERROR"], + help="Logging level") parser.add_argument('--vertex-count', type=int, default=100, help='Number of created vertices.') parser.add_argument('--max-property-value', type=int, default=1000, @@ -94,14 +97,14 @@ def create_handler(): session.run("MATCH (n) DETACH DELETE n").consume() # concurrent create execution & tests - with multiprocessing.Pool(args.thread_count) as p: + with multiprocessing.Pool(args.worker_count) as p: for worker_id, create_time, time_unit in \ - p.map(create_worker, [i for i in range(args.thread_count)]): + p.map(create_worker, [i for i in range(args.worker_count)]): log.info('Worker ID: %s; Create time: %s%s' % (worker_id, create_time, time_unit)) # check total count - expected_total_count = args.thread_count * args.vertex_count + expected_total_count = args.worker_count * args.vertex_count total_count = session.run( 'MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'] assert total_count == expected_total_count, \ @@ -110,5 +113,7 @@ def create_handler(): if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=args.logging) + if args.logging != "DEBUG": + logging.getLogger("neo4j").setLevel(logging.WARNING) create_handler() diff --git a/tests/stress/long_running.py b/tests/stress/long_running.py index 7461cb16a..590f1c7e9 100755 --- a/tests/stress/long_running.py +++ b/tests/stress/long_running.py @@ -277,11 +277,11 @@ class GraphSession(): def runner(params): num, args = params driver = common.argument_driver(args) - graph = Graph(args.vertex_count // args.thread_count, - args.edge_count // args.thread_count) + graph = Graph(args.vertex_count // args.worker_count, + args.edge_count // args.worker_count) log.info("Starting query runner process") session = GraphSession(num, graph, driver.session()) - session.run_loop(args.vertex_batch, args.max_queries // args.thread_count, + session.run_loop(args.vertex_batch, args.max_queries // args.worker_count, args.max_time, args.verify) log.info("Runner %d executed %d queries", num, session.executed_queries) driver.close() @@ -307,8 +307,8 @@ def parse_args(): help="Maximum execution time in minutes") argp.add_argument("--verify", type=int, default=0, help="Interval (seconds) between checking local info") - argp.add_argument("--thread-count", type=int, default=1, - help="The number of threads that operate on the graph " + argp.add_argument("--worker-count", type=int, default=1, + help="The number of workers that operate on the graph " "independently") return argp.parse_args() @@ -323,13 +323,13 @@ def main(): # cleanup and create indexes driver = common.argument_driver(args) driver.session().run("MATCH (n) DETACH DELETE n").consume() - for i in range(args.thread_count): + for i in range(args.worker_count): label = INDEX_FORMAT.format(i) driver.session().run("CREATE INDEX ON :%s(id)" % label).consume() driver.close() - params = [(i, args) for i in range(args.thread_count)] - with multiprocessing.Pool(args.thread_count) as p: + params = [(i, args) for i in range(args.worker_count)] + with multiprocessing.Pool(args.worker_count) as p: p.map(runner, params, 1) log.info("All query runners done")