142b1f42b1
Summary: * add run_pokec script because more than one step is required * refactor of plot_throughput script * move all plot scripts under tools/plot Reviewers: mferencevic, teon.banek, mislav.bradac Reviewed By: mferencevic Subscribers: florijan, pullbot, buda Differential Revision: https://phabricator.memgraph.io/D1106
283 lines
11 KiB
Python
Executable File
283 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
import itertools
|
|
import json
|
|
import subprocess
|
|
from argparse import ArgumentParser
|
|
from collections import OrderedDict
|
|
from common import get_absolute_path
|
|
from query_suite import QuerySuite, QueryParallelSuite
|
|
from long_running_suite import LongRunningSuite
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import jail
|
|
APOLLO = True
|
|
except:
|
|
import jail_faker as jail
|
|
APOLLO = False
|
|
|
|
|
|
class Loader:
|
|
"""
|
|
Loads file contents. Supported types are:
|
|
.py - executable that prints out Cypher queries
|
|
.cypher - contains Cypher queries in textual form
|
|
.json - contains a configuration
|
|
|
|
A QueryLoader object is callable.
|
|
A call to it returns a generator that yields loaded data
|
|
(Cypher queries, configuration). In that sense one
|
|
QueryLoader is reusable. The generator approach makes it possible
|
|
to generated different queries each time when executing a .py file.
|
|
"""
|
|
def __init__(self, file_path):
|
|
self.file_path = file_path
|
|
|
|
def _queries(self, data):
|
|
""" Helper function for breaking down and filtering queries"""
|
|
for element in filter(lambda x: x is not None,
|
|
map(str.strip, data.replace("\n", " ").split(";"))):
|
|
yield element
|
|
|
|
def __call__(self):
|
|
""" Yields queries found in the given file_path one by one """
|
|
log.debug("Generating queries from file_path: %s",
|
|
self.file_path)
|
|
_, extension = os.path.splitext(self.file_path)
|
|
if extension == ".cypher":
|
|
with open(self.file_path) as f:
|
|
return self._queries(f.read())
|
|
elif extension == ".py":
|
|
return self._queries(subprocess.check_output(
|
|
["python3", self.file_path]).decode("ascii"))
|
|
elif extension == ".json":
|
|
with open(self.file_path) as f:
|
|
return [json.load(f)].__iter__()
|
|
else:
|
|
raise Exception("Unsupported filetype {} ".format(extension))
|
|
|
|
def __repr__(self):
|
|
return "(Loader<%s>)" % self.file_path
|
|
|
|
|
|
|
|
def load_scenarios(args, known_keys, suite_groups):
|
|
"""
|
|
Scans through folder structure starting with groups_root and
|
|
loads query scenarios.
|
|
Expected folder structure is:
|
|
groups_root/
|
|
groupname1/
|
|
config.json
|
|
common.py
|
|
setup.FILE_TYPE
|
|
teardown.FILE_TYPE
|
|
itersetup.FILE_TYPE
|
|
iterteardown.FILE_TYPE
|
|
scenario1.config.json
|
|
scenario1.run.FILE_TYPE-------(mandatory)
|
|
scenario1.setup.FILE_TYPE
|
|
scenario1.teardown.FILE_TYPE
|
|
scenario1.itersetup.FILE_TYPE
|
|
scenario1.iterteardown.FILE_TYPE
|
|
scenario2...
|
|
...
|
|
groupname2/
|
|
...
|
|
|
|
Per query configs (setup, teardown, itersetup, iterteardown)
|
|
override group configs for that scenario. Group configs must have one
|
|
extension (.FILE_TYPE) and
|
|
scenario configs must have 2 extensions (.scenario_name.FILE_TYPE).
|
|
Each suite doesn't need to implement all query steps and filetypes.
|
|
See documentation in each suite for supported ones.
|
|
|
|
Args:
|
|
args: additional args parsed by this function
|
|
group_paths: str, root folder that contains group folders
|
|
Return:
|
|
{group: (scenario, {config: query_generator_function})
|
|
"""
|
|
argp = ArgumentParser("QuerySuite.scenarios argument parser")
|
|
argp.add_argument("--query-scenarios-root",
|
|
default=get_absolute_path("groups"), dest="root")
|
|
args, _ = argp.parse_known_args()
|
|
log.info("Loading query scenarios from root: %s", args.root)
|
|
|
|
def fill_config_dict(config_dict, base, config_files):
|
|
for config_file in config_files:
|
|
log.debug("Processing config file %s", config_file)
|
|
config_name = config_file.split(".")[-2]
|
|
config_dict[config_name] = Loader(os.path.join(base, config_file))
|
|
|
|
# Validate that the scenario does not contain any illegal keys (defense
|
|
# against typos in file naming).
|
|
unknown_keys = set(config_dict) - known_keys
|
|
if unknown_keys:
|
|
raise Exception("Unknown QuerySuite config elements: '%r'" %
|
|
unknown_keys)
|
|
|
|
def dir_content(root, predicate):
|
|
return [p for p in os.listdir(root)
|
|
if predicate(os.path.join(root, p))]
|
|
|
|
group_scenarios = OrderedDict()
|
|
for group in dir_content(args.root, os.path.isdir):
|
|
if group not in suite_groups: continue
|
|
log.info("Loading group: '%s'", group)
|
|
|
|
group_scenarios[group] = []
|
|
# Filter out hidden files: .gitignore, ...
|
|
files = dir_content(os.path.join(args.root, group),
|
|
lambda x: os.path.isfile(x) and os.path.basename(x)[0] != ".")
|
|
|
|
# Process group default config.
|
|
group_config = {}
|
|
fill_config_dict(group_config, os.path.join(args.root, group),
|
|
[f for f in files if f.count(".") == 1])
|
|
|
|
# Group files on scenario.
|
|
for scenario_name, scenario_files in itertools.groupby(
|
|
filter(lambda f: f.count(".") == 2, sorted(files)),
|
|
lambda x: x.split(".")[0]):
|
|
log.info("Loading scenario: '%s'", scenario_name)
|
|
scenario = dict(group_config)
|
|
fill_config_dict(scenario,
|
|
os.path.join(args.root, group),
|
|
scenario_files)
|
|
group_scenarios[group].append((scenario_name, scenario))
|
|
log.debug("Loaded config for scenario '%s'\n%r", scenario_name,
|
|
scenario)
|
|
|
|
return group_scenarios
|
|
|
|
|
|
def main():
|
|
argp = ArgumentParser(description=__doc__)
|
|
# positional, mandatory args
|
|
argp.add_argument("suite", help="Suite to run.")
|
|
argp.add_argument("runner", help="Engine to use.")
|
|
# named, optional arguments
|
|
argp.add_argument("--groups", nargs="+", help="Groups to run. If none are"
|
|
" provided, all available grups are run.")
|
|
argp.add_argument("--scenarios", nargs="+", help="Scenarios to run. If "
|
|
"none are provided, all available are run.")
|
|
argp.add_argument("--logging", default="INFO", choices=["INFO", "DEBUG"],
|
|
help="Logging level")
|
|
argp.add_argument("--additional-run-fields", default={}, type=json.loads,
|
|
help="Additional fields to add to the 'run', in JSON")
|
|
argp.add_argument("--no-strict", default=False, action="store_true",
|
|
help="Ignores nonexisting groups instead of raising an "
|
|
"exception")
|
|
args, remaining_args = argp.parse_known_args()
|
|
|
|
if args.logging:
|
|
logging.basicConfig(level=args.logging)
|
|
logging.getLogger("requests").setLevel(logging.WARNING)
|
|
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
|
logging.getLogger("neo4j.bolt").setLevel(logging.WARNING)
|
|
log.info("Memgraph benchmark suite harness")
|
|
log.info("Executing for suite '%s', runner '%s'", args.suite, args.runner)
|
|
|
|
# Create suites.
|
|
suites = {"QuerySuite": QuerySuite,
|
|
"QueryParallelSuite": QueryParallelSuite,
|
|
"LongRunningSuite": LongRunningSuite}
|
|
if args.suite not in suites:
|
|
raise Exception(
|
|
"Suite '{}' isn't registered. Registered suites are: {}".format(
|
|
args.suite, suites))
|
|
suite = suites[args.suite](remaining_args)
|
|
|
|
# Load scenarios.
|
|
group_scenarios = load_scenarios(
|
|
remaining_args, suite.KNOWN_KEYS, suite.groups())
|
|
log.info("Loaded %d groups, with a total of %d scenarios",
|
|
len(group_scenarios),
|
|
sum([len(x) for x in group_scenarios.values()]))
|
|
|
|
# Create runner.
|
|
runners = suite.runners()
|
|
if args.runner not in runners:
|
|
raise Exception("Runner '{}' not registered for suite '{}'".format(
|
|
args.runner, args.suite))
|
|
runner = runners[args.runner](remaining_args)
|
|
|
|
# Validate groups (if provided).
|
|
groups = []
|
|
if args.groups:
|
|
for group in args.groups:
|
|
if group not in suite.groups():
|
|
msg = "Group '{}' isn't registered for suite '{}'".format(
|
|
group, suite)
|
|
if args.no_strict:
|
|
log.warn(msg)
|
|
else:
|
|
raise Exception(msg)
|
|
else:
|
|
groups.append(group)
|
|
else:
|
|
# No groups provided, use all suite group
|
|
groups = suite.groups()
|
|
|
|
# Filter scenarios.
|
|
# TODO enable scenario filtering on regex
|
|
filtered_scenarios = OrderedDict()
|
|
for group, scenarios in group_scenarios.items():
|
|
if group not in groups:
|
|
log.info("Skipping group '%s'", group)
|
|
continue
|
|
for scenario_name, scenario in scenarios:
|
|
if args.scenarios and scenario_name not in args.scenarios:
|
|
continue
|
|
filtered_scenarios[(group, scenario_name)] = scenario
|
|
|
|
if len(filtered_scenarios) == 0:
|
|
log.info("No scenarios to execute")
|
|
return
|
|
|
|
results = []
|
|
|
|
# Run scenarios.
|
|
log.info("Executing %d scenarios", len(filtered_scenarios))
|
|
for (group, scenario_name), scenario in sorted(filtered_scenarios.items()):
|
|
log.info("Executing group.scenario '%s.%s' with elements %s",
|
|
group, scenario_name, list(scenario.keys()))
|
|
results.append(suite.run(scenario, group, scenario_name, runner))
|
|
|
|
# Print summary.
|
|
print("\n\nMacro benchmark summary:")
|
|
print("{}\n".format(suite.summary))
|
|
|
|
# Save data points.
|
|
with open(get_absolute_path(".harness_summary"), "w") as f:
|
|
json.dump({"results": results, "headers": suite.headers}, f)
|
|
|
|
# The if block is here because the results from all suites
|
|
# aren't compatible with the export below.
|
|
if type(suite) not in [QuerySuite, QueryParallelSuite]:
|
|
log.warning("The results from the suite "
|
|
"aren't compatible with the apollo measurements export.")
|
|
return
|
|
|
|
# Export data points.
|
|
with open(get_absolute_path(".apollo_measurements"), "w") as f:
|
|
headers = list(suite.headers)
|
|
headers.remove("group_name")
|
|
headers.remove("scenario_name")
|
|
for row in results:
|
|
group, scenario = row.pop("group_name"), row.pop("scenario_name")
|
|
for header in headers:
|
|
f.write("{}.{}.{} {:.20f}\n".format(group, scenario,
|
|
header, row[header]["median"]))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|