memgraph/tests/stress/continuous_integration
Matej Ferencevic b3d1cd8257 Improve long running stress test
Summary:
The test is now more strict than before. Each verification step doesn't just
verify that object counts are correct, it now also verifies that all object IDs
are correct. The continuous integration script is improved to have a more
deterministic startup.

Reviewers: teon.banek, ipaljak

Reviewed By: teon.banek, ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2606
2019-12-23 14:18:52 +01:00

220 lines
6.9 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import atexit
import json
import multiprocessing
import os
import subprocess
import sys
import time
# dataset calibrated for running on Apollo (total 4min)
# bipartite.py runs for approx. 30s
# create_match.py runs for approx. 30s
# long_running runs for 1min
# long_running runs for 2min
SMALL_DATASET = [
{
"test": "bipartite.py",
"options": ["--u-count", "100", "--v-count", "100"],
"timeout": 5,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "40000", "--create-pack-size", "100"],
"timeout": 5,
},
{
"test": "long_running.cpp",
"options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"],
"timeout": 5,
},
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"],
"timeout": 5,
},
]
# dataset calibrated for running on daily stress instance (total 9h)
# bipartite.py and create_match.py run for approx. 15min
# long_running runs for 5min x 6 times = 30min
# long_running runs for 8h
LARGE_DATASET = [
{
"test": "bipartite.py",
"options": ["--u-count", "300", "--v-count", "300"],
"timeout": 30,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "500000", "--create-pack-size", "500"],
"timeout": 30,
},
] + [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"],
"timeout": 8,
},
] * 6 + [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"],
"timeout": 500,
},
]
# paths
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements")
KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem")
CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem")
# long running stats file
STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats")
SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
# get number of threads
if "THREADS" in os.environ:
THREADS = os.environ["THREADS"]
else:
THREADS = multiprocessing.cpu_count()
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
# run test helper function
def run_test(args, test, options, timeout):
print("Running test '{}'".format(test))
# find binary
if test.endswith(".py"):
logging = "DEBUG" if args.verbose else "WARNING"
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test),
"--logging", logging]
elif test.endswith(".cpp"):
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
if not os.path.exists(exe):
exe = os.path.join(BASE_DIR, "build_release", "tests", "stress",
test[:-4])
binary = [exe]
else:
raise Exception("Test '{}' binary not supported!".format(test))
# start test
cmd = binary + ["--worker-count", str(THREADS)] + options
start = time.time()
ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60)
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(
test, ret_test.returncode))
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
return runtime
# parse arguments
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--log-file", default = "")
parser.add_argument("--data-directory", default = "")
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type = str)
parser.add_argument("--large-dataset", action = "store_const",
const = True, default = False)
parser.add_argument("--use-ssl", action = "store_const",
const = True, default = False)
parser.add_argument("--verbose", action = "store_const",
const = True, default = False)
args = parser.parse_args()
# find memgraph binary
if not os.path.exists(args.memgraph):
args.memgraph = os.path.join(BASE_DIR, "build_release", "memgraph")
# generate temporary SSL certs
if args.use_ssl:
# https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively
subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com"
subprocess.run(["openssl", "req", "-new", "-newkey", "rsa:4096",
"-days", "365", "-nodes", "-x509", "-subj", subj,
"-keyout", KEY_FILE, "-out", CERT_FILE], check=True)
# start memgraph
cwd = os.path.dirname(args.memgraph)
cmd = [args.memgraph, "--bolt-num-workers=" + str(THREADS),
"--storage-properties-on-edges=true",
"--storage-snapshot-on-exit=true",
"--storage-snapshot-interval-sec=600",
"--storage-snapshot-retention-count=1",
"--storage-wal-enabled=true",
"--storage-recover-on-startup=false",
"--query-execution-timeout-sec=600"]
if not args.verbose:
cmd += ["--min-log-level", "1"]
if args.log_file:
cmd += ["--log-file", args.log_file]
if args.data_directory:
cmd += ["--data-directory", args.data_directory]
if args.use_ssl:
cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE]
proc_mg = subprocess.Popen(cmd, cwd = cwd)
wait_for_server(7687)
assert proc_mg.poll() is None, "The database binary died prematurely!"
# at exit cleanup
@atexit.register
def cleanup():
global proc_mg
if proc_mg.poll() != None: return
proc_mg.kill()
proc_mg.wait()
# run tests
runtimes = {}
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
for test in dataset:
if args.use_ssl:
test["options"] += ["--use-ssl"]
runtime = run_test(args, **test)
runtimes[os.path.splitext(test["test"])[0]] = runtime
# stop memgraph
proc_mg.terminate()
ret_mg = proc_mg.wait()
if ret_mg != 0:
raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg))
# cleanup certificates
if args.use_ssl:
os.remove(KEY_FILE)
os.remove(CERT_FILE)
# measurements
measurements = ""
for key, value in runtimes.items():
measurements += "{}.runtime {}\n".format(key, value)
with open(STATS_FILE) as f:
stats = f.read().split("\n")
measurements += "long_running.queries.executed {}\n".format(stats[0])
measurements += "long_running.queries.failed {}\n".format(stats[1])
with open(MEASUREMENTS_FILE, "w") as f:
f.write(measurements)
print("Done!")