#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import atexit import json import multiprocessing import os import subprocess import sys import tempfile import time # dataset calibrated for running on Apollo (total 3min) # long_running runs for 1min # long_running runs for 2min SMALL_DATASET = [ { "test": "ha_normal_operation_long_running.cpp", "options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"], "timeout": 5, }, { "test": "ha_normal_operation_long_running.cpp", "options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"], "timeout": 5, }, ] # dataset calibrated for running on daily stress instance (total 2h) # long_running runs for 2h LARGE_DATASET = [ { "test": "long_running.cpp", "options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "120", "--verify", "300"], "timeout": 140, }, ] # paths SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) BUILD_DIR = os.path.join(BASE_DIR, "build") CONFIG_DIR = os.path.join(BASE_DIR, "config") MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements") KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem") CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem") # long running stats file STATS_FILE = os.path.join(SCRIPT_DIR, ".ha_normal_operation_long_running_stats") SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) # HA related constants CLUSTER_SIZE = 3 PORT = 7687 RAFT_CONFIG_FILE = os.path.join(SCRIPT_DIR, "raft.json") # parse arguments parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.") parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR, "memgraph_ha")) parser.add_argument("--config", default = os.path.join(CONFIG_DIR, "stress.conf")) parser.add_argument("--log-file", default = "") parser.add_argument("--durability-directory", default = "") parser.add_argument("--python", default = os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type = str) parser.add_argument("--large-dataset", action = "store_const", const = True, default = False) parser.add_argument("--verbose", action = "store_const", const = True, default = False) parser.add_argument("--threads", type = int, default = 8) args = parser.parse_args() # run test helper function def run_test(args, test, options, timeout): print("Running test '{}'".format(test)) # find binary if test.endswith(".py"): logging = "DEBUG" if args.verbose else "WARNING" binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging] elif test.endswith(".cpp"): exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4]) if not os.path.exists(exe): exe = os.path.join(BASE_DIR, "build_release", "tests", "stress", test[:-4]) binary = [exe] else: raise Exception("Test '{}' binary not supported!".format(test)) # start test cmd = binary + ["--worker-count", str(args.threads)] + options start = time.time() ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60) if ret_test.returncode != 0: raise Exception("Test '{}' binary returned non-zero ({})!".format( test, ret_test.returncode)) runtime = time.time() - start print(" Done after {:.3f} seconds".format(runtime)) return runtime # find HA memgraph binary if not os.path.exists(args.memgraph): args.memgraph = os.path.join(BASE_DIR, "build_release", "memgraph_ha") # Generate database instance flags workers = [None for worker in range(CLUSTER_SIZE)] durability_directory = tempfile.TemporaryDirectory() coordination_config_file = tempfile.NamedTemporaryFile(mode="w") def generate_cmd_args(worker_id): cmd = [args.memgraph] cmd.extend(["--server-id", str(worker_id + 1)]) cmd.extend(["--port", str(PORT + worker_id)]) cmd.extend(["--raft-config-file", RAFT_CONFIG_FILE]) cmd.extend(["--coordination-config-file", coordination_config_file.name]) # generate durability directory dur = os.path.join(durability_directory.name, "worker" + str(worker_id)) cmd.extend(["--durability-directory", dur]) return cmd def wait_for_server(port, delay=0.1): cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] while subprocess.call(cmd) != 0: time.sleep(delay) time.sleep(delay) def start_worker(worker_id): global workers assert worker_id >= 0 and worker_id < CLUSTER_SIZE, \ "Invalid worker ID {}".format(worker_id) assert workers[worker_id] is None, \ "Worker with id {} already exists".format(worker_id) workers[worker_id] = subprocess.Popen(generate_cmd_args(worker_id)) time.sleep(0.2) assert workers[worker_id].poll() is None, \ "Worker {} process died prematurely!".format(worker_id) wait_for_server(PORT + worker_id) # Generate coorindation config file data = [] for i in range(CLUSTER_SIZE): data.append([i + 1, "127.0.0.1", 10000 + i]) coordination_config_file.write(json.dumps(data)) coordination_config_file.flush() # Start Ha Memgraph cluster for worker_id in range(CLUSTER_SIZE): start_worker(worker_id) # at exit cleanup @atexit.register def cleanup(): for proc_mg in workers: if proc_mg.poll() is not None: continue proc_mg.kill() proc_mg.wait() # run tests runtimes = {} dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET for test in dataset: runtime = run_test(args, **test) runtimes[os.path.splitext(test["test"])[0]] = runtime # stop memgraph for proc_mg in workers: proc_mg.terminate() ret_mg = proc_mg.wait() if ret_mg != 0: raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg)) # measurements measurements = "" for key, value in runtimes.items(): measurements += "{}.runtime {}\n".format(key, value) with open(STATS_FILE) as f: stats = f.read().split("\n") measurements += "long_running.queries.executed {}\n".format(stats[0]) measurements += "long_running.queries.failed {}\n".format(stats[1]) with open(MEASUREMENTS_FILE, "w") as f: f.write(measurements) print("Done!")