memgraph/tests/benchmark_infra/harness/harness.py

467 lines
18 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import os
from os import path
import requests
import time
import itertools
import json
from subprocess import check_output
from argparse import ArgumentParser
from collections import OrderedDict
from collections import defaultdict
import tempfile
import jail_faker as jail
from bolt_client import WALL_TIME
from perf import Perf
log = logging.getLogger(__name__)
class _QuerySuite:
"""
Executes a Query-based benchmark scenario. Query-based scenarios
consist of setup steps (Cypher queries) executed before the benchmark,
a single Cypher query that is benchmarked, and teardown steps
(Cypher queries) executed after the benchmark.
"""
# what the QuerySuite can work with
KNOWN_KEYS = {"config", "setup", "itersetup", "run", "iterteardown",
"teardown"}
summary = "Summary:\n{:>30}{:>30}{:>30}{:>30}{:>30}\n".format(
"scenario_name", "query_parsing_time", "query_planning_time",
"query_plan_execution_time", WALL_TIME)
def __init__(self, args):
self.perf = Perf()
argp = ArgumentParser(description=__doc__)
argp.add_argument("--perf", help="Run perf on memgraph binary.",
action="store_true")
args, _ = argp.parse_known_args(args)
self.perf = Perf() if args.perf else None
class Loader:
"""
Loads file contents. Supported types are:
.py - executable that prints out Cypher queries
.cypher - contains Cypher queries in textual form
.json - contains a configuration
A QueryLoader object is callable.
A call to it returns a generator that yields loaded data
(Cypher queries, configuration). In that sense one
QueryLoader is reusable. The generator approach makes it possible
to generated different queries each time when executing a .py file.
"""
def __init__(self, file_path):
self.file_path = file_path
def _queries(self, data):
""" Helper function for breaking down and filtering queries"""
for element in filter(
None, map(str.strip, data.replace("\n", " ").split(";"))):
yield element
def __call__(self):
""" Yields queries found in the given file_path one by one """
log.debug("Generating queries from file_path: %s",
self.file_path)
_, extension = path.splitext(self.file_path)
if extension == ".cypher":
with open(self.file_path) as f:
return self._queries(f.read())
elif extension == ".py":
return self._queries(check_output(
["python3", self.file_path]).decode("ascii"))
elif extension == ".json":
with open(self.file_path) as f:
return [json.load(f)].__iter__()
else:
raise Exception("Unsupported filetype {} ".format(extension))
def __repr__(self):
return "(QuerySuite.Loader<%s>)" % self.file_path
@staticmethod
def scenarios(args):
"""
Scans through folder structure starting with groups_root and
loads query scenarios.
Expected folder structure is:
groups_root/
groupname1/
config.json
setup.FILE_TYPE
teardown.FILE_TYPE
itersetup.FILE_TYPE
iterteardown.FILE_TYPE
scenario1.config.json
scenario1.run.FILE_TYPE-------(mandatory)
scenario1.setup.FILE_TYPE
scenario1.teardown.FILE_TYPE
scenario1.itersetup.FILE_TYPE
scenario1.iterteardown.FILE_TYPE
scenario2...
...
groupname2/
...
Per query configs (setup, teardown, itersetup, iterteardown)
override group configs for that scenario. Group configs must have one
extension (.FILE_TYPE) and
scenario configs must have 2 extensions (.scenario_name.FILE_TYPE).
See `QueryLoader` documentation to see which file types are supported.
Args:
args: additional args parsed by this function
group_paths: str, root folder that contains group folders
Return:
{group: (scenario, {config: query_generator_function})
"""
argp = ArgumentParser("QuerySuite.scenarios argument parser")
argp.add_argument("--query-scenarios-root", default=path.join(
path.dirname(path.dirname(path.realpath(__file__))), "groups"),
dest="root")
args, _ = argp.parse_known_args()
log.info("Loading query scenarios from root: %s", args.root)
def fill_config_dict(config_dict, base, config_files):
for config_file in config_files:
log.debug("Processing config file %s", config_file)
config_name = config_file.split(".")[-2]
config_dict[config_name] = QuerySuite.Loader(
path.join(base, config_file))
# validate that the scenario does not contain any illegal
# keys (defense against typos in file naming)
unknown_keys = set(config_dict) - QuerySuite.KNOWN_KEYS
if unknown_keys:
raise Exception("Unknown QuerySuite config elements: '%r'" %
unknown_keys)
def dir_content(root, predicate):
return [p for p in os.listdir(root)
if predicate(path.join(root, p))]
group_scenarios = OrderedDict()
for group in dir_content(args.root, path.isdir):
log.info("Loading group: '%s'", group)
group_scenarios[group] = []
files = dir_content(path.join(args.root, group),
path.isfile)
# process group default config
group_config = {}
fill_config_dict(group_config, path.join(args.root, group),
[f for f in files if f.count(".") == 1])
# group files on scenario
for scenario_name, scenario_files in itertools.groupby(
filter(lambda f: f.count(".") == 2, sorted(files)),
lambda x: x.split(".")[0]):
log.info("Loading scenario: '%s'", scenario_name)
scenario = dict(group_config)
fill_config_dict(scenario,
path.join(args.root, group),
scenario_files)
group_scenarios[group].append((scenario_name, scenario))
log.debug("Loaded config for scenario '%s'\n%r", scenario_name,
scenario)
return group_scenarios
def run(self, scenario, scenario_name, runner):
log.debug("QuerySuite.run() with scenario: %s", scenario)
scenario_config = scenario.get("config")
scenario_config = next(scenario_config()) if scenario_config else {}
def execute(config_name, num_client_workers=1):
queries = scenario.get(config_name)
return runner.execute(queries(), num_client_workers) if queries \
else None
measurements = []
measurement_sums = defaultdict(float)
def add_measurement(dictionary, iteration, key):
if key in dictionary:
measurement = {"target": key,
"value": float(dictionary[key]),
"unit": "s",
"type": "time",
"iteration": iteration}
measurements.append(measurement)
try:
measurement_sums[key] += float(dictionary[key])
except:
pass
pid = runner.start()
execute("setup")
# warmup phase
for _ in range(min(scenario_config.get("iterations", 1),
scenario_config.get("warmup", 3))):
execute("itersetup")
execute("run", scenario_config.get("num_client_workers", 1))
execute("iterteardown")
if self.perf:
self.perf.start(pid)
# TODO per scenario/run runner configuration
num_iterations = scenario_config.get("iterations", 1)
for iteration in range(num_iterations):
# TODO if we didn't have the itersetup it would be trivial
# to move iteration to the bolt_client script, so we would not
# have to start and stop the client for each iteration, it would
# most likely run faster
execute("itersetup")
# TODO measure CPU time (expose it from the runner)
run_result = execute("run",
scenario_config.get("num_client_workers", 1))
add_measurement(run_result, iteration, WALL_TIME)
if len(run_result.get("metadatas", [])) == 1:
add_measurement(run_result["metadatas"][0], iteration,
"query_parsing_time")
add_measurement(run_result["metadatas"][0], iteration,
"query_plan_execution_time")
add_measurement(run_result["metadatas"][0], iteration,
"query_planning_time")
execute("iterteardown")
if self.perf:
self.perf.stop()
# TODO value outlier detection and warning across iterations
execute("teardown")
runner.stop()
self.append_scenario_summary(scenario_name, measurement_sums,
num_iterations)
return measurements
def append_scenario_summary(self, scenario_name, measurement_sums,
num_iterations):
self.summary += "{:>30}".format(scenario_name)
for key in ("query_parsing_time", "query_planning_time",
"query_plan_execution_time", WALL_TIME):
if key not in measurement_sums:
time = "-"
else:
time = "{:.10f}".format(measurement_sums[key] / num_iterations)
self.summary += "{:>30}".format(time)
self.summary += "\n"
def runners(self):
""" Which runners can execute a QuerySuite scenario """
assert False, "This is a base class, use one of derived suites"
def groups(self):
""" Which groups can be executed by a QuerySuite scenario """
assert False, "This is a base class, use one of derived suites"
return ["create", "match", "expression", "aggregation", "return",
"update", "delete", "hardcoded"]
class QuerySuite(_QuerySuite):
def __init__(self, args):
_QuerySuite.__init__(self, args)
def runners(self):
return ["MemgraphRunner"]
def groups(self):
return ["create", "match", "expression", "aggregation", "return",
"update", "delete"]
class QueryParallelSuite(_QuerySuite):
def __init__(self, args):
_QuerySuite.__init__(self, args)
def runners(self):
return ["MemgraphRunner"]
def groups(self):
return ["aggregation_parallel", "create_parallel"]
class MemgraphRunner:
"""
Knows how to start and stop Memgraph (backend) some client frontent
(bolt), and execute a cypher query.
Execution returns benchmarking data (execution times, memory
usage etc).
"""
def __init__(self, args):
"""
Creates and configures MemgraphRunner.
Args:
args: args to pass to ArgumentParser
"""
log.info("Initializing MemgraphRunner with arguments %r", args)
# parse arguments
argp = ArgumentParser("MemgraphRunnerArgumentParser")
argp.add_argument("--MemgraphRunnerBin",
default=os.path.join(os.path.dirname(__file__),
"../../../build/memgraph"))
argp.add_argument("--MemgraphRunnerConfig", required=False)
argp.add_argument("--MemgraphRunnerURI", default="localhost:7687")
argp.add_argument("--MemgraphRunnerEncryptBolt", action="store_true")
self.args, _ = argp.parse_known_args(args)
self.memgraph_bin = jail.get_process()
self.bolt_client = jail.get_process()
def start(self):
log.info("MemgraphRunner.start")
environment = os.environ.copy()
if self.args.MemgraphRunnerConfig:
environment["MEMGRAPH_CONFIG"] = self.args.MemgraphRunnerConfig
self.memgraph_bin.run(self.args.MemgraphRunnerBin, env=environment)
# TODO change to a check via SIGUSR
time.sleep(1.0)
return self.memgraph_bin.get_pid()
def execute(self, queries, num_client_workers):
log.debug("MemgraphRunner.execute('%s')", str(queries))
client_args = [path.join(path.dirname(__file__), "bolt_client.py")]
client_args += ["--endpoint", self.args.MemgraphRunnerURI]
client_args += ["--num-workers", str(num_client_workers)]
if self.args.MemgraphRunnerEncryptBolt:
client_args.append("--ssl-enabled")
queries_fd, queries_path = tempfile.mkstemp()
try:
queries_file = os.fdopen(queries_fd, "w")
queries_file.write("\n".join(queries))
queries_file.close()
except:
queries_file.close()
os.remove(queries_path)
raise Exception("Writing queries to temporary file failed")
# TODO make the timeout configurable per query or something
return_code = self.bolt_client.run_and_wait(
"python3", client_args, timeout=300, stdin=queries_path)
os.remove(queries_path)
if return_code != 0:
with open(self.bolt_client.get_stderr()) as f:
stderr = f.read()
log.error("MemgraphRunner - error while executing queries '%s'. "
"Failed with return_code %d and stderr:\n%s",
str(queries), return_code, stderr)
raise Exception("BoltClient execution failed")
with open(self.bolt_client.get_stdout()) as f:
return json.loads(f.read())
def stop(self):
log.info("MemgraphRunner.stop")
self.bolt_client.send_signal(jail.SIGKILL)
self.bolt_client.wait()
self.memgraph_bin.send_signal(jail.SIGKILL)
self.memgraph_bin.wait()
def parse_known_args():
argp = ArgumentParser(description=__doc__)
# positional, mandatory args
argp.add_argument("suite", help="Suite to run.")
argp.add_argument("runner", help="Engine to use.")
# named, optional arguments
argp.add_argument("--groups", nargs="+", help="Groups to run. If none are"
" provided, all available grups are run.")
argp.add_argument("--scenarios", nargs="+", help="Scenarios to run. If "
"none are provided, all available are run.")
argp.add_argument("--logging", default="INFO", choices=["INFO", "DEBUG"],
help="Logging level")
argp.add_argument("--additional-run-fields", default={}, type=json.loads,
help="Additional fields to add to the 'run', in JSON")
return argp.parse_known_args()
def main():
args, remaining_args = parse_known_args()
if args.logging:
logging.basicConfig(level=args.logging)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("neo4j.bolt").setLevel(logging.WARNING)
log.info("Memgraph benchmark suite harness")
log.info("Executing for suite '%s', runner '%s'", args.suite, args.runner)
# Create suite
suites = {"QuerySuite": QuerySuite, "QueryParallelSuite": QueryParallelSuite}
if args.suite not in suites:
raise Exception(
"Suite '{}' isn't registered. Registered suites are: {}".format(
args.suite, suites))
suite = suites[args.suite](remaining_args)
# Load scenarios
group_scenarios = suites[args.suite].scenarios(remaining_args)
log.info("Loaded %d groups, with a total of %d scenarios",
len(group_scenarios),
sum([len(x) for x in group_scenarios.values()]))
# Create runner
runners = {"MemgraphRunner": MemgraphRunner}
# TODO if make runner argument optional, then execute all runners
if args.runner not in suite.runners():
raise Exception("Runner '{}' not registered for suite '{}'".format(
args.runner, args.suite))
runner = runners[args.runner](remaining_args)
# Validate groups (if provided)
if args.groups:
for group in args.groups:
if group not in suite.groups():
raise Exception("Group '{}' isn't registered for suite '{}'".
format(group, suite))
groups = args.groups
else:
# No groups provided, use all suite group
groups = suite.groups()
# TODO enable scenario filtering on regex
filtered_scenarios = OrderedDict()
for group, scenarios in group_scenarios.items():
if group not in groups:
log.info("Skipping group '%s'", group)
continue
for scenario_name, scenario in scenarios:
if args.scenarios and scenario_name not in args.scenarios:
continue
filtered_scenarios[(group, scenario_name)] = scenario
if len(filtered_scenarios) == 0:
log.info("No scenarios to execute")
return
log.info("Executing %d scenarios", len(filtered_scenarios))
results = []
for (group, scenario_name), scenario in filtered_scenarios.items():
log.info("Executing group.scenario '%s.%s' with elements %s",
group, scenario_name, list(scenario.keys()))
for iter_result in suite.run(scenario, scenario_name, runner):
iter_result["group"] = group
iter_result["scenario"] = scenario_name
results.append(iter_result)
run = dict()
run["suite"] = args.suite
run["runner"] = runner.__class__.__name__
run["runner_config"] = vars(runner.args)
run.update(args.additional_run_fields)
for result in results:
jail.store_data(json.dumps(result))
print("\n\n{}\n".format(suite.summary))
if __name__ == "__main__":
main()