#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import os import time import itertools import json import subprocess from argparse import ArgumentParser from collections import OrderedDict from common import get_absolute_path from query_suite import QuerySuite, QueryParallelSuite from long_running_suite import LongRunningSuite log = logging.getLogger(__name__) try: import jail APOLLO = True except: import jail_faker as jail APOLLO = False class Loader: """ Loads file contents. Supported types are: .py - executable that prints out Cypher queries .cypher - contains Cypher queries in textual form .json - contains a configuration A QueryLoader object is callable. A call to it returns a generator that yields loaded data (Cypher queries, configuration). In that sense one QueryLoader is reusable. The generator approach makes it possible to generated different queries each time when executing a .py file. """ def __init__(self, file_path): self.file_path = file_path def _queries(self, data): """ Helper function for breaking down and filtering queries""" for element in filter(lambda x: x is not None, map(str.strip, data.replace("\n", " ").split(";"))): yield element def __call__(self): """ Yields queries found in the given file_path one by one """ log.debug("Generating queries from file_path: %s", self.file_path) _, extension = os.path.splitext(self.file_path) if extension == ".cypher": with open(self.file_path) as f: return self._queries(f.read()) elif extension == ".py": return self._queries(subprocess.check_output( ["python3", self.file_path]).decode("ascii")) elif extension == ".json": with open(self.file_path) as f: return [json.load(f)].__iter__() else: raise Exception("Unsupported filetype {} ".format(extension)) def __repr__(self): return "(Loader<%s>)" % self.file_path def load_scenarios(args, known_keys, suite_groups): """ Scans through folder structure starting with groups_root and loads query scenarios. Expected folder structure is: groups_root/ groupname1/ config.json common.py setup.FILE_TYPE teardown.FILE_TYPE itersetup.FILE_TYPE iterteardown.FILE_TYPE scenario1.config.json scenario1.run.FILE_TYPE-------(mandatory) scenario1.setup.FILE_TYPE scenario1.teardown.FILE_TYPE scenario1.itersetup.FILE_TYPE scenario1.iterteardown.FILE_TYPE scenario2... ... groupname2/ ... Per query configs (setup, teardown, itersetup, iterteardown) override group configs for that scenario. Group configs must have one extension (.FILE_TYPE) and scenario configs must have 2 extensions (.scenario_name.FILE_TYPE). Each suite doesn't need to implement all query steps and filetypes. See documentation in each suite for supported ones. Args: args: additional args parsed by this function group_paths: str, root folder that contains group folders Return: {group: (scenario, {config: query_generator_function}) """ argp = ArgumentParser("QuerySuite.scenarios argument parser") argp.add_argument("--query-scenarios-root", default=get_absolute_path("groups"), dest="root") args, _ = argp.parse_known_args() log.info("Loading query scenarios from root: %s", args.root) def fill_config_dict(config_dict, base, config_files): for config_file in config_files: log.debug("Processing config file %s", config_file) config_name = config_file.split(".")[-2] config_dict[config_name] = Loader(os.path.join(base, config_file)) # Validate that the scenario does not contain any illegal keys (defense # against typos in file naming). unknown_keys = set(config_dict) - known_keys if unknown_keys: raise Exception("Unknown QuerySuite config elements: '%r'" % unknown_keys) def dir_content(root, predicate): return [p for p in os.listdir(root) if predicate(os.path.join(root, p))] group_scenarios = OrderedDict() for group in dir_content(args.root, os.path.isdir): if group not in suite_groups: continue log.info("Loading group: '%s'", group) group_scenarios[group] = [] # Filter out hidden files: .gitignore, ... files = dir_content(os.path.join(args.root, group), lambda x: os.path.isfile(x) and os.path.basename(x)[0] != ".") # Process group default config. group_config = {} fill_config_dict(group_config, os.path.join(args.root, group), [f for f in files if f.count(".") == 1]) # Group files on scenario. for scenario_name, scenario_files in itertools.groupby( filter(lambda f: f.count(".") == 2, sorted(files)), lambda x: x.split(".")[0]): log.info("Loading scenario: '%s'", scenario_name) scenario = dict(group_config) fill_config_dict(scenario, os.path.join(args.root, group), scenario_files) group_scenarios[group].append((scenario_name, scenario)) log.debug("Loaded config for scenario '%s'\n%r", scenario_name, scenario) return group_scenarios def main(): argp = ArgumentParser(description=__doc__) # positional, mandatory args argp.add_argument("suite", help="Suite to run.") argp.add_argument("runner", help="Engine to use.") # named, optional arguments argp.add_argument("--groups", nargs="+", help="Groups to run. If none are" " provided, all available grups are run.") argp.add_argument("--scenarios", nargs="+", help="Scenarios to run. If " "none are provided, all available are run.") argp.add_argument("--logging", default="INFO", choices=["INFO", "DEBUG"], help="Logging level") argp.add_argument("--additional-run-fields", default={}, type=json.loads, help="Additional fields to add to the 'run', in JSON") argp.add_argument("--no-strict", default=False, action="store_true", help="Ignores nonexisting groups instead of raising an " "exception") args, remaining_args = argp.parse_known_args() if args.logging: logging.basicConfig(level=args.logging) logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("neo4j.bolt").setLevel(logging.WARNING) log.info("Memgraph benchmark suite harness") log.info("Executing for suite '%s', runner '%s'", args.suite, args.runner) # Create suites. suites = {"QuerySuite": QuerySuite, "QueryParallelSuite": QueryParallelSuite, "LongRunningSuite": LongRunningSuite} if args.suite not in suites: raise Exception( "Suite '{}' isn't registered. Registered suites are: {}".format( args.suite, suites)) suite = suites[args.suite](remaining_args) # Load scenarios. group_scenarios = load_scenarios( remaining_args, suite.KNOWN_KEYS, suite.groups()) log.info("Loaded %d groups, with a total of %d scenarios", len(group_scenarios), sum([len(x) for x in group_scenarios.values()])) # Create runner. runners = suite.runners() if args.runner not in runners: raise Exception("Runner '{}' not registered for suite '{}'".format( args.runner, args.suite)) runner = runners[args.runner](remaining_args) # Validate groups (if provided). groups = [] if args.groups: for group in args.groups: if group not in suite.groups(): msg = "Group '{}' isn't registered for suite '{}'".format( group, suite) if args.no_strict: log.warn(msg) else: raise Exception(msg) else: groups.append(group) else: # No groups provided, use all suite group groups = suite.groups() # Filter scenarios. # TODO enable scenario filtering on regex filtered_scenarios = OrderedDict() for group, scenarios in group_scenarios.items(): if group not in groups: log.info("Skipping group '%s'", group) continue for scenario_name, scenario in scenarios: if args.scenarios and scenario_name not in args.scenarios: continue filtered_scenarios[(group, scenario_name)] = scenario if len(filtered_scenarios) == 0: log.info("No scenarios to execute") return results = [] # Run scenarios. log.info("Executing %d scenarios", len(filtered_scenarios)) for (group, scenario_name), scenario in sorted(filtered_scenarios.items()): log.info("Executing group.scenario '%s.%s' with elements %s", group, scenario_name, list(scenario.keys())) results.append(suite.run(scenario, group, scenario_name, runner)) # Print summary. print("\n\nMacro benchmark summary:") print("{}\n".format(suite.summary)) # Save data points. with open(get_absolute_path(".harness_summary"), "w") as f: json.dump({"results": results, "headers": suite.headers}, f) # The if block is here because the results from all suites # aren't compatible with the export below. if type(suite) not in [QuerySuite, QueryParallelSuite]: log.warning("The results from the suite " "aren't compatible with the apollo measurements export.") return # Export data points. with open(get_absolute_path(".apollo_measurements"), "w") as f: headers = list(suite.headers) headers.remove("group_name") headers.remove("scenario_name") for row in results: group, scenario = row.pop("group_name"), row.pop("scenario_name") for header in headers: f.write("{}.{}.{} {:.20f}\n".format(group, scenario, header, row[header]["median"])) if __name__ == "__main__": main()