memgraph/tests/stress/continuous_integration
2022-07-26 20:54:56 +02:00

238 lines
7.0 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import atexit
import json
import multiprocessing
import os
import subprocess
import sys
import time
# dataset calibrated for running on Apollo (total 4min)
# bipartite.py runs for approx. 30s
# create_match.py runs for approx. 30s
# long_running runs for 1min
# long_running runs for 2min
SMALL_DATASET = [
{
"test": "bipartite.py",
"options": ["--u-count", "100", "--v-count", "100"],
"timeout": 5,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "40000", "--create-pack-size", "100"],
"timeout": 5,
},
{
"test": "parser.cpp",
"options": ["--per-worker-query-count", "1000"],
"timeout": 5,
},
{
"test": "long_running.cpp",
"options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"],
"timeout": 5,
},
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"],
"timeout": 5,
},
]
# dataset calibrated for running on daily stress instance (total 9h)
# bipartite.py and create_match.py run for approx. 15min
# long_running runs for 5min x 6 times = 30min
# long_running runs for 8h
LARGE_DATASET = (
[
{
"test": "bipartite.py",
"options": ["--u-count", "300", "--v-count", "300"],
"timeout": 30,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "500000", "--create-pack-size", "500"],
"timeout": 30,
},
]
+ [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"],
"timeout": 16,
},
]
* 6
+ [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"],
"timeout": 500,
},
]
)
# paths
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements")
KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem")
CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem")
# long running stats file
STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats")
SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
# get number of threads
if "THREADS" in os.environ:
THREADS = os.environ["THREADS"]
else:
THREADS = multiprocessing.cpu_count()
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
# run test helper function
def run_test(args, test, options, timeout):
print("Running test '{}'".format(test))
# find binary
if test.endswith(".py"):
logging = "DEBUG" if args.verbose else "WARNING"
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging]
elif test.endswith(".cpp"):
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
binary = [exe]
else:
raise Exception("Test '{}' binary not supported!".format(test))
# start test
cmd = binary + ["--worker-count", str(THREADS)] + options
start = time.time()
ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60)
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(test, ret_test.returncode))
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
return runtime
# parse arguments
parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph"))
parser.add_argument("--log-file", default="")
parser.add_argument("--data-directory", default="")
parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type=str)
parser.add_argument("--large-dataset", action="store_const", const=True, default=False)
parser.add_argument("--use-ssl", action="store_const", const=True, default=False)
parser.add_argument("--verbose", action="store_const", const=True, default=False)
args = parser.parse_args()
# generate temporary SSL certs
if args.use_ssl:
# https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively
subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com"
subprocess.run(
[
"openssl",
"req",
"-new",
"-newkey",
"rsa:4096",
"-days",
"365",
"-nodes",
"-x509",
"-subj",
subj,
"-keyout",
KEY_FILE,
"-out",
CERT_FILE,
],
check=True,
)
# start memgraph
cwd = os.path.dirname(args.memgraph)
cmd = [
args.memgraph,
"--bolt-num-workers=" + str(THREADS),
"--storage-properties-on-edges=true",
"--storage-snapshot-on-exit=true",
"--storage-snapshot-interval-sec=600",
"--storage-snapshot-retention-count=1",
"--storage-wal-enabled=true",
"--storage-recover-on-startup=false",
"--query-execution-timeout-sec=1200",
]
if not args.verbose:
cmd += ["--log-level", "WARNING"]
if args.log_file:
cmd += ["--log-file", args.log_file]
if args.data_directory:
cmd += ["--data-directory", args.data_directory]
if args.use_ssl:
cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE]
proc_mg = subprocess.Popen(cmd, cwd=cwd)
wait_for_server(7687)
assert proc_mg.poll() is None, "The database binary died prematurely!"
# at exit cleanup
@atexit.register
def cleanup():
global proc_mg
if proc_mg.poll() != None:
return
proc_mg.kill()
proc_mg.wait()
# run tests
runtimes = {}
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
for test in dataset:
if args.use_ssl:
test["options"] += ["--use-ssl"]
runtime = run_test(args, **test)
runtimes[os.path.splitext(test["test"])[0]] = runtime
# stop memgraph
proc_mg.terminate()
ret_mg = proc_mg.wait()
if ret_mg != 0:
raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg))
# cleanup certificates
if args.use_ssl:
os.remove(KEY_FILE)
os.remove(CERT_FILE)
# measurements
measurements = ""
for key, value in runtimes.items():
measurements += "{}.runtime {}\n".format(key, value)
with open(STATS_FILE) as f:
stats = f.read().split("\n")
measurements += "long_running.queries.executed {}\n".format(stats[0])
measurements += "long_running.queries.failed {}\n".format(stats[1])
with open(MEASUREMENTS_FILE, "w") as f:
f.write(measurements)
print("Done!")