#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import atexit import json import multiprocessing import os import subprocess import sys import time # dataset calibrated for running on Apollo (total 4min) # bipartite.py runs for approx. 30s # create_match.py runs for approx. 30s # long_running runs for 1min # long_running runs for 2min SMALL_DATASET = [ { "test": "bipartite.py", "options": ["--u-count", "100", "--v-count", "100"], "timeout": 5, }, { "test": "create_match.py", "options": ["--vertex-count", "40000", "--create-pack-size", "100"], "timeout": 5, }, { "test": "long_running.cpp", "options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"], "timeout": 5, }, { "test": "long_running.cpp", "options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"], "timeout": 5, }, ] # dataset calibrated for running on daily stress instance (total 9h) # bipartite.py and create_match.py run for approx. 15min # long_running runs for 5min x 6 times = 30min # long_running runs for 8h LARGE_DATASET = [ { "test": "bipartite.py", "options": ["--u-count", "300", "--v-count", "300"], "timeout": 30, }, { "test": "create_match.py", "options": ["--vertex-count", "500000", "--create-pack-size", "500"], "timeout": 30, }, ] + [ { "test": "long_running.cpp", "options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"], "timeout": 8, }, ] * 6 + [ { "test": "long_running.cpp", "options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"], "timeout": 500, }, ] # paths SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements") KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem") CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem") # long running stats file STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats") SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) # get number of threads if "THREADS" in os.environ: THREADS = os.environ["THREADS"] else: THREADS = multiprocessing.cpu_count() def get_build_dir(): if os.path.exists(os.path.join(BASE_DIR, "build_release")): return os.path.join(BASE_DIR, "build_release") if os.path.exists(os.path.join(BASE_DIR, "build_community")): return os.path.join(BASE_DIR, "build_community") return os.path.join(BASE_DIR, "build") def wait_for_server(port, delay=0.1): cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] while subprocess.call(cmd) != 0: time.sleep(0.01) time.sleep(delay) # run test helper function def run_test(args, test, options, timeout): print("Running test '{}'".format(test)) # find binary if test.endswith(".py"): logging = "DEBUG" if args.verbose else "WARNING" binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging] elif test.endswith(".cpp"): exe = os.path.join(get_build_dir(), "tests", "stress", test[:-4]) binary = [exe] else: raise Exception("Test '{}' binary not supported!".format(test)) # start test cmd = binary + ["--worker-count", str(THREADS)] + options start = time.time() ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60) if ret_test.returncode != 0: raise Exception("Test '{}' binary returned non-zero ({})!".format( test, ret_test.returncode)) runtime = time.time() - start print(" Done after {:.3f} seconds".format(runtime)) return runtime # parse arguments parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.") parser.add_argument("--memgraph", default = os.path.join(get_build_dir(), "memgraph")) parser.add_argument("--log-file", default = "") parser.add_argument("--data-directory", default = "") parser.add_argument("--python", default = os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type = str) parser.add_argument("--large-dataset", action = "store_const", const = True, default = False) parser.add_argument("--use-ssl", action = "store_const", const = True, default = False) parser.add_argument("--verbose", action = "store_const", const = True, default = False) args = parser.parse_args() # generate temporary SSL certs if args.use_ssl: # https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com" subprocess.run(["openssl", "req", "-new", "-newkey", "rsa:4096", "-days", "365", "-nodes", "-x509", "-subj", subj, "-keyout", KEY_FILE, "-out", CERT_FILE], check=True) # start memgraph cwd = os.path.dirname(args.memgraph) cmd = [args.memgraph, "--bolt-num-workers=" + str(THREADS), "--storage-properties-on-edges=true", "--storage-snapshot-on-exit=true", "--storage-snapshot-interval-sec=600", "--storage-snapshot-retention-count=1", "--storage-wal-enabled=true", "--storage-recover-on-startup=false", "--query-execution-timeout-sec=1200"] if not args.verbose: cmd += ["--min-log-level", "1"] if args.log_file: cmd += ["--log-file", args.log_file] if args.data_directory: cmd += ["--data-directory", args.data_directory] if args.use_ssl: cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE] proc_mg = subprocess.Popen(cmd, cwd = cwd) wait_for_server(7687) assert proc_mg.poll() is None, "The database binary died prematurely!" # at exit cleanup @atexit.register def cleanup(): global proc_mg if proc_mg.poll() != None: return proc_mg.kill() proc_mg.wait() # run tests runtimes = {} dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET for test in dataset: if args.use_ssl: test["options"] += ["--use-ssl"] runtime = run_test(args, **test) runtimes[os.path.splitext(test["test"])[0]] = runtime # stop memgraph proc_mg.terminate() ret_mg = proc_mg.wait() if ret_mg != 0: raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg)) # cleanup certificates if args.use_ssl: os.remove(KEY_FILE) os.remove(CERT_FILE) # measurements measurements = "" for key, value in runtimes.items(): measurements += "{}.runtime {}\n".format(key, value) with open(STATS_FILE) as f: stats = f.read().split("\n") measurements += "long_running.queries.executed {}\n".format(stats[0]) measurements += "long_running.queries.failed {}\n".format(stats[1]) with open(MEASUREMENTS_FILE, "w") as f: f.write(measurements) print("Done!")