2017-07-11 00:17:18 +08:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
from os import path
|
|
|
|
import time
|
|
|
|
import itertools
|
|
|
|
import json
|
|
|
|
from subprocess import check_output
|
|
|
|
from argparse import ArgumentParser
|
|
|
|
from collections import OrderedDict
|
|
|
|
from collections import defaultdict
|
2017-07-11 23:45:34 +08:00
|
|
|
import tempfile
|
2017-07-29 23:03:34 +08:00
|
|
|
import shutil
|
2017-08-30 19:12:46 +08:00
|
|
|
from statistics import median
|
2017-07-11 00:17:18 +08:00
|
|
|
|
2017-07-30 00:05:19 +08:00
|
|
|
try:
|
|
|
|
import jail
|
|
|
|
APOLLO = True
|
|
|
|
except:
|
|
|
|
import jail_faker as jail
|
|
|
|
APOLLO = False
|
|
|
|
|
2017-07-30 02:40:29 +08:00
|
|
|
DIR_PATH = path.dirname(path.realpath(__file__))
|
|
|
|
WALL_TIME = "wall_time"
|
2017-07-30 17:41:26 +08:00
|
|
|
CPU_TIME = "cpu_time"
|
2017-07-11 00:17:18 +08:00
|
|
|
from perf import Perf
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
def load_scenarios(args):
|
|
|
|
"""
|
|
|
|
Scans through folder structure starting with groups_root and
|
|
|
|
loads query scenarios.
|
|
|
|
Expected folder structure is:
|
|
|
|
groups_root/
|
|
|
|
groupname1/
|
|
|
|
config.json
|
|
|
|
common.py
|
|
|
|
setup.FILE_TYPE
|
|
|
|
teardown.FILE_TYPE
|
|
|
|
itersetup.FILE_TYPE
|
|
|
|
iterteardown.FILE_TYPE
|
|
|
|
scenario1.config.json
|
|
|
|
scenario1.run.FILE_TYPE-------(mandatory)
|
|
|
|
scenario1.setup.FILE_TYPE
|
|
|
|
scenario1.teardown.FILE_TYPE
|
|
|
|
scenario1.itersetup.FILE_TYPE
|
|
|
|
scenario1.iterteardown.FILE_TYPE
|
|
|
|
scenario2...
|
|
|
|
...
|
|
|
|
groupname2/
|
|
|
|
...
|
|
|
|
|
|
|
|
Per query configs (setup, teardown, itersetup, iterteardown)
|
|
|
|
override group configs for that scenario. Group configs must have one
|
|
|
|
extension (.FILE_TYPE) and
|
|
|
|
scenario configs must have 2 extensions (.scenario_name.FILE_TYPE).
|
|
|
|
Each suite doesn't need to implement all query steps and filetypes.
|
|
|
|
See documentation in each suite for supported ones.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
args: additional args parsed by this function
|
|
|
|
group_paths: str, root folder that contains group folders
|
|
|
|
Return:
|
|
|
|
{group: (scenario, {config: query_generator_function})
|
|
|
|
"""
|
|
|
|
argp = ArgumentParser("QuerySuite.scenarios argument parser")
|
|
|
|
argp.add_argument("--query-scenarios-root", default=path.join(
|
|
|
|
DIR_PATH, "groups"),
|
|
|
|
dest="root")
|
|
|
|
args, _ = argp.parse_known_args()
|
|
|
|
log.info("Loading query scenarios from root: %s", args.root)
|
|
|
|
|
|
|
|
def fill_config_dict(config_dict, base, config_files):
|
|
|
|
for config_file in config_files:
|
|
|
|
log.debug("Processing config file %s", config_file)
|
|
|
|
config_name = config_file.split(".")[-2]
|
|
|
|
config_dict[config_name] = QuerySuite.Loader(
|
|
|
|
path.join(base, config_file))
|
|
|
|
|
|
|
|
# validate that the scenario does not contain any illegal
|
|
|
|
# keys (defense against typos in file naming)
|
|
|
|
unknown_keys = set(config_dict) - QuerySuite.KNOWN_KEYS
|
|
|
|
if unknown_keys:
|
|
|
|
raise Exception("Unknown QuerySuite config elements: '%r'" %
|
|
|
|
unknown_keys)
|
|
|
|
|
|
|
|
def dir_content(root, predicate):
|
|
|
|
return [p for p in os.listdir(root)
|
|
|
|
if predicate(path.join(root, p))]
|
|
|
|
|
|
|
|
group_scenarios = OrderedDict()
|
|
|
|
for group in dir_content(args.root, path.isdir):
|
|
|
|
log.info("Loading group: '%s'", group)
|
|
|
|
|
|
|
|
group_scenarios[group] = []
|
|
|
|
files = dir_content(path.join(args.root, group),
|
|
|
|
path.isfile)
|
|
|
|
|
|
|
|
# process group default config
|
|
|
|
group_config = {}
|
|
|
|
fill_config_dict(group_config, path.join(args.root, group),
|
|
|
|
[f for f in files if f.count(".") == 1])
|
|
|
|
|
|
|
|
# group files on scenario
|
|
|
|
for scenario_name, scenario_files in itertools.groupby(
|
|
|
|
filter(lambda f: f.count(".") == 2, sorted(files)),
|
|
|
|
lambda x: x.split(".")[0]):
|
|
|
|
log.info("Loading scenario: '%s'", scenario_name)
|
|
|
|
scenario = dict(group_config)
|
|
|
|
fill_config_dict(scenario,
|
|
|
|
path.join(args.root, group),
|
|
|
|
scenario_files)
|
|
|
|
group_scenarios[group].append((scenario_name, scenario))
|
|
|
|
log.debug("Loaded config for scenario '%s'\n%r", scenario_name,
|
|
|
|
scenario)
|
|
|
|
|
|
|
|
return group_scenarios
|
|
|
|
|
2017-07-11 00:17:18 +08:00
|
|
|
|
2017-07-18 01:14:42 +08:00
|
|
|
class _QuerySuite:
|
2017-07-11 00:17:18 +08:00
|
|
|
"""
|
|
|
|
Executes a Query-based benchmark scenario. Query-based scenarios
|
|
|
|
consist of setup steps (Cypher queries) executed before the benchmark,
|
|
|
|
a single Cypher query that is benchmarked, and teardown steps
|
|
|
|
(Cypher queries) executed after the benchmark.
|
|
|
|
"""
|
|
|
|
# what the QuerySuite can work with
|
|
|
|
KNOWN_KEYS = {"config", "setup", "itersetup", "run", "iterteardown",
|
2017-07-29 23:03:34 +08:00
|
|
|
"teardown", "common"}
|
2017-08-24 21:03:21 +08:00
|
|
|
FORMAT = ["{:>24}", "{:>28}", "{:>22}", "{:>24}", "{:>28}",
|
|
|
|
"{:>16}", "{:>16}"]
|
|
|
|
FULL_FORMAT = "".join(FORMAT) + "\n"
|
|
|
|
summary = FULL_FORMAT.format(
|
|
|
|
"group_name", "scenario_name", "query_parsing_time",
|
2017-07-30 17:41:26 +08:00
|
|
|
"query_planning_time", "query_plan_execution_time",
|
|
|
|
WALL_TIME, CPU_TIME)
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
def __init__(self, args):
|
2017-07-30 01:20:54 +08:00
|
|
|
if not APOLLO:
|
|
|
|
self.perf = Perf()
|
2017-07-11 00:17:18 +08:00
|
|
|
argp = ArgumentParser(description=__doc__)
|
|
|
|
argp.add_argument("--perf", help="Run perf on memgraph binary.",
|
|
|
|
action="store_true")
|
|
|
|
args, _ = argp.parse_known_args(args)
|
|
|
|
self.perf = Perf() if args.perf else None
|
|
|
|
|
2017-07-16 23:35:02 +08:00
|
|
|
class Loader:
|
2017-07-11 00:17:18 +08:00
|
|
|
"""
|
|
|
|
Loads file contents. Supported types are:
|
|
|
|
.py - executable that prints out Cypher queries
|
|
|
|
.cypher - contains Cypher queries in textual form
|
|
|
|
.json - contains a configuration
|
|
|
|
|
|
|
|
A QueryLoader object is callable.
|
|
|
|
A call to it returns a generator that yields loaded data
|
|
|
|
(Cypher queries, configuration). In that sense one
|
|
|
|
QueryLoader is reusable. The generator approach makes it possible
|
|
|
|
to generated different queries each time when executing a .py file.
|
|
|
|
"""
|
|
|
|
def __init__(self, file_path):
|
|
|
|
self.file_path = file_path
|
|
|
|
|
|
|
|
def _queries(self, data):
|
|
|
|
""" Helper function for breaking down and filtering queries"""
|
2017-07-11 23:45:34 +08:00
|
|
|
for element in filter(
|
|
|
|
None, map(str.strip, data.replace("\n", " ").split(";"))):
|
2017-07-11 00:17:18 +08:00
|
|
|
yield element
|
|
|
|
|
|
|
|
def __call__(self):
|
|
|
|
""" Yields queries found in the given file_path one by one """
|
|
|
|
log.debug("Generating queries from file_path: %s",
|
|
|
|
self.file_path)
|
|
|
|
_, extension = path.splitext(self.file_path)
|
|
|
|
if extension == ".cypher":
|
|
|
|
with open(self.file_path) as f:
|
|
|
|
return self._queries(f.read())
|
|
|
|
elif extension == ".py":
|
|
|
|
return self._queries(check_output(
|
|
|
|
["python3", self.file_path]).decode("ascii"))
|
|
|
|
elif extension == ".json":
|
|
|
|
with open(self.file_path) as f:
|
|
|
|
return [json.load(f)].__iter__()
|
|
|
|
else:
|
|
|
|
raise Exception("Unsupported filetype {} ".format(extension))
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "(QuerySuite.Loader<%s>)" % self.file_path
|
|
|
|
|
2017-08-24 21:03:21 +08:00
|
|
|
def run(self, scenario, group_name, scenario_name, runner):
|
2017-07-11 00:17:18 +08:00
|
|
|
log.debug("QuerySuite.run() with scenario: %s", scenario)
|
|
|
|
scenario_config = scenario.get("config")
|
|
|
|
scenario_config = next(scenario_config()) if scenario_config else {}
|
|
|
|
|
2017-07-18 01:14:42 +08:00
|
|
|
def execute(config_name, num_client_workers=1):
|
2017-07-11 00:17:18 +08:00
|
|
|
queries = scenario.get(config_name)
|
2017-07-18 01:14:42 +08:00
|
|
|
return runner.execute(queries(), num_client_workers) if queries \
|
2017-07-29 23:03:34 +08:00
|
|
|
else None
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
measurements = []
|
|
|
|
|
2017-08-30 19:12:46 +08:00
|
|
|
measurement_lists = defaultdict(list)
|
2017-07-29 23:03:34 +08:00
|
|
|
|
2017-07-11 00:17:18 +08:00
|
|
|
def add_measurement(dictionary, iteration, key):
|
|
|
|
if key in dictionary:
|
2017-07-16 23:35:02 +08:00
|
|
|
measurement = {"target": key,
|
|
|
|
"value": float(dictionary[key]),
|
|
|
|
"unit": "s",
|
|
|
|
"type": "time",
|
|
|
|
"iteration": iteration}
|
2017-07-11 00:17:18 +08:00
|
|
|
measurements.append(measurement)
|
|
|
|
try:
|
2017-08-30 19:12:46 +08:00
|
|
|
measurement_lists[key].append(float(dictionary[key]))
|
2017-07-11 00:17:18 +08:00
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
2017-07-30 02:40:29 +08:00
|
|
|
pid = runner.start()
|
2017-07-11 00:17:18 +08:00
|
|
|
execute("setup")
|
|
|
|
|
|
|
|
# warmup phase
|
|
|
|
for _ in range(min(scenario_config.get("iterations", 1),
|
2017-07-30 22:22:25 +08:00
|
|
|
scenario_config.get("warmup", 2))):
|
2017-07-11 00:17:18 +08:00
|
|
|
execute("itersetup")
|
2017-07-18 01:14:42 +08:00
|
|
|
execute("run", scenario_config.get("num_client_workers", 1))
|
2017-07-11 00:17:18 +08:00
|
|
|
execute("iterteardown")
|
|
|
|
|
|
|
|
if self.perf:
|
|
|
|
self.perf.start(pid)
|
|
|
|
|
|
|
|
# TODO per scenario/run runner configuration
|
|
|
|
num_iterations = scenario_config.get("iterations", 1)
|
|
|
|
for iteration in range(num_iterations):
|
|
|
|
# TODO if we didn't have the itersetup it would be trivial
|
|
|
|
# to move iteration to the bolt_client script, so we would not
|
|
|
|
# have to start and stop the client for each iteration, it would
|
|
|
|
# most likely run faster
|
|
|
|
execute("itersetup")
|
2017-07-18 01:14:42 +08:00
|
|
|
run_result = execute("run",
|
2017-07-29 23:03:34 +08:00
|
|
|
scenario_config.get("num_client_workers", 1))
|
2017-07-11 00:17:18 +08:00
|
|
|
add_measurement(run_result, iteration, WALL_TIME)
|
2017-07-30 17:41:26 +08:00
|
|
|
add_measurement(run_result, iteration, CPU_TIME)
|
2017-07-30 22:22:25 +08:00
|
|
|
for measurement in ["query_parsing_time",
|
|
|
|
"query_plan_execution_time",
|
|
|
|
"query_planning_time"] :
|
|
|
|
for i in range(len(run_result.get("metadatas", []))):
|
|
|
|
add_measurement(run_result["metadatas"][i], iteration,
|
|
|
|
measurement)
|
2017-07-11 00:17:18 +08:00
|
|
|
execute("iterteardown")
|
|
|
|
|
|
|
|
if self.perf:
|
|
|
|
self.perf.stop()
|
|
|
|
|
|
|
|
# TODO value outlier detection and warning across iterations
|
|
|
|
execute("teardown")
|
|
|
|
runner.stop()
|
2017-08-24 21:03:21 +08:00
|
|
|
self.append_scenario_summary(group_name, scenario_name,
|
2017-08-30 19:12:46 +08:00
|
|
|
measurement_lists, num_iterations)
|
2017-07-11 00:17:18 +08:00
|
|
|
return measurements
|
|
|
|
|
2017-08-24 21:03:21 +08:00
|
|
|
def append_scenario_summary(self, group_name, scenario_name,
|
2017-08-30 19:12:46 +08:00
|
|
|
measurement_lists, num_iterations):
|
2017-08-24 21:03:21 +08:00
|
|
|
self.summary += self.FORMAT[0].format(group_name)
|
|
|
|
self.summary += self.FORMAT[1].format(scenario_name)
|
|
|
|
for i, key in enumerate(("query_parsing_time", "query_planning_time",
|
|
|
|
"query_plan_execution_time", WALL_TIME, CPU_TIME)):
|
2017-08-30 19:12:46 +08:00
|
|
|
if key not in measurement_lists:
|
2017-07-11 00:17:18 +08:00
|
|
|
time = "-"
|
|
|
|
else:
|
2017-08-30 19:12:46 +08:00
|
|
|
# Median is used instead of avg to avoid effect of outliers.
|
|
|
|
time = "{:.10f}".format(median(measurement_lists[key]))
|
2017-08-24 21:03:21 +08:00
|
|
|
self.summary += self.FORMAT[i + 2].format(time)
|
2017-07-11 00:17:18 +08:00
|
|
|
self.summary += "\n"
|
|
|
|
|
|
|
|
def runners(self):
|
|
|
|
""" Which runners can execute a QuerySuite scenario """
|
2017-07-18 01:14:42 +08:00
|
|
|
assert False, "This is a base class, use one of derived suites"
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
def groups(self):
|
|
|
|
""" Which groups can be executed by a QuerySuite scenario """
|
2017-07-18 01:14:42 +08:00
|
|
|
assert False, "This is a base class, use one of derived suites"
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
|
2017-07-18 01:14:42 +08:00
|
|
|
class QuerySuite(_QuerySuite):
|
|
|
|
def __init__(self, args):
|
|
|
|
_QuerySuite.__init__(self, args)
|
|
|
|
|
|
|
|
def runners(self):
|
2017-07-29 23:03:34 +08:00
|
|
|
return ["MemgraphRunner", "NeoRunner"]
|
2017-07-18 01:14:42 +08:00
|
|
|
|
|
|
|
def groups(self):
|
2017-08-28 16:51:19 +08:00
|
|
|
return ["1000_create", "unwind_create", "match", "expression",
|
|
|
|
"aggregation", "return", "update", "delete"]
|
2017-07-18 01:14:42 +08:00
|
|
|
|
|
|
|
|
|
|
|
class QueryParallelSuite(_QuerySuite):
|
|
|
|
def __init__(self, args):
|
|
|
|
_QuerySuite.__init__(self, args)
|
|
|
|
|
|
|
|
def runners(self):
|
2017-07-29 23:03:34 +08:00
|
|
|
return ["MemgraphRunner", "NeoRunner"]
|
2017-07-18 01:14:42 +08:00
|
|
|
|
|
|
|
def groups(self):
|
2017-07-21 21:03:10 +08:00
|
|
|
return ["aggregation_parallel", "create_parallel"]
|
2017-07-18 01:14:42 +08:00
|
|
|
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
def get_common_runner_argument_parser():
|
|
|
|
argp = ArgumentParser("CommonRunnerArgumentParser")
|
|
|
|
argp.add_argument("--address", help="Database and client address",
|
|
|
|
default="127.0.0.1")
|
|
|
|
argp.add_argument("--port", help="Database and client port",
|
|
|
|
default="7687")
|
|
|
|
return argp
|
|
|
|
|
|
|
|
|
|
|
|
# Database wrappers.
|
|
|
|
|
|
|
|
class Memgraph:
|
|
|
|
"""
|
|
|
|
Knows how to start and stop memgraph.
|
|
|
|
"""
|
2017-09-01 17:33:14 +08:00
|
|
|
def __init__(self, args, cpus):
|
2017-08-30 23:17:43 +08:00
|
|
|
self.log = logging.getLogger("MemgraphRunner")
|
|
|
|
argp = ArgumentParser("MemgraphArgumentParser", add_help=False,
|
|
|
|
parents=[get_common_runner_argument_parser()])
|
|
|
|
argp.add_argument("--RunnerBin",
|
|
|
|
default=os.path.join(DIR_PATH,
|
|
|
|
"../../../build/memgraph"))
|
|
|
|
argp.add_argument("--RunnerConfig",
|
|
|
|
default=os.path.normpath(os.path.join(
|
|
|
|
DIR_PATH,
|
|
|
|
"../../../config/benchmarking_latency.conf")))
|
|
|
|
self.log.info("Initializing Runner with arguments %r", args)
|
|
|
|
self.args, _ = argp.parse_known_args(args)
|
|
|
|
self.database_bin = jail.get_process()
|
|
|
|
self.database_bin.set_cpus(cpus)
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
self.log.info("start")
|
|
|
|
environment = os.environ.copy()
|
|
|
|
environment["MEMGRAPH_CONFIG"] = self.args.RunnerConfig
|
|
|
|
database_args = ["--interface", self.args.address,
|
|
|
|
"--port", self.args.port]
|
|
|
|
self.database_bin.run(self.args.RunnerBin, database_args,
|
|
|
|
env=environment, timeout=600)
|
|
|
|
# TODO change to a check via SIGUSR
|
|
|
|
time.sleep(1.0)
|
|
|
|
return self.database_bin.get_pid() if not APOLLO else None
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
self.database_bin.send_signal(jail.SIGTERM)
|
|
|
|
self.database_bin.wait()
|
|
|
|
|
|
|
|
|
|
|
|
class Neo:
|
|
|
|
def __init__(self, args, cpus):
|
|
|
|
self.log = logging.getLogger("NeoRunner")
|
|
|
|
argp = ArgumentParser("NeoArgumentParser", add_help=False,
|
|
|
|
parents=[get_common_runner_argument_parser()])
|
|
|
|
argp.add_argument(
|
|
|
|
"--RunnerConfigDir",
|
|
|
|
default=path.join(DIR_PATH, "neo4j_config"))
|
|
|
|
self.log.info("Initializing Runner with arguments %r", args)
|
|
|
|
self.args, _ = argp.parse_known_args(args)
|
|
|
|
if self.args.address != "127.0.0.1" or self.args.port != "7687":
|
|
|
|
raise Exception(
|
|
|
|
"Neo wrapper doesn't support different address or port")
|
|
|
|
self.database_bin = jail.get_process()
|
|
|
|
self.database_bin.set_cpus(cpus)
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
self.log.info("start")
|
|
|
|
environment = os.environ.copy()
|
|
|
|
environment["NEO4J_CONF"] = self.args.RunnerConfigDir
|
2017-09-01 17:33:14 +08:00
|
|
|
self.neo4j_home_path = tempfile.mkdtemp(dir="/dev/shm")
|
|
|
|
environment["NEO4J_HOME"] = self.neo4j_home_path
|
|
|
|
try:
|
|
|
|
self.database_bin.run("/usr/share/neo4j/bin/neo4j", args=["console"],
|
|
|
|
env=environment, timeout=600)
|
|
|
|
# TODO change to a check via SIGUSR
|
|
|
|
time.sleep(5.0)
|
|
|
|
except:
|
|
|
|
shutil.rmtree(self.neo4j_home_path)
|
|
|
|
raise Exception("Couldn't create symlink or run neo4j")
|
2017-08-30 23:17:43 +08:00
|
|
|
return self.database_bin.get_pid() if not APOLLO else None
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
self.database_bin.send_signal(jail.SIGTERM)
|
|
|
|
self.database_bin.wait()
|
2017-09-01 17:33:14 +08:00
|
|
|
if path.exists(self.neo4j_home_path):
|
|
|
|
shutil.rmtree(self.neo4j_home_path)
|
2017-08-30 23:17:43 +08:00
|
|
|
|
|
|
|
|
|
|
|
class _HarnessClientRunner:
|
2017-07-11 00:17:18 +08:00
|
|
|
"""
|
2017-07-29 23:03:34 +08:00
|
|
|
Knows how to start and stop database (backend) some client frontend (bolt),
|
|
|
|
and execute a cypher query.
|
2017-07-11 00:17:18 +08:00
|
|
|
Execution returns benchmarking data (execution times, memory
|
|
|
|
usage etc).
|
2017-07-29 23:03:34 +08:00
|
|
|
Inherited class should implement start method and initialise database_bin
|
|
|
|
and bolt_client members of type Process.
|
2017-07-11 00:17:18 +08:00
|
|
|
"""
|
2017-08-30 23:17:43 +08:00
|
|
|
def __init__(self, args, database, cpus=None):
|
|
|
|
if cpus is None: cpus = [2, 3]
|
|
|
|
self.log = logging.getLogger("_HarnessClientRunner")
|
|
|
|
self.database = database
|
|
|
|
argp = ArgumentParser("RunnerArgumentParser", add_help=False,
|
|
|
|
parents=[get_common_runner_argument_parser()])
|
|
|
|
self.args, _ = argp.parse_known_args()
|
|
|
|
self.bolt_client = jail.get_process()
|
|
|
|
self.bolt_client.set_cpus(cpus)
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
def start(self):
|
2017-08-30 23:17:43 +08:00
|
|
|
self.database.start()
|
2017-07-11 00:17:18 +08:00
|
|
|
|
2017-07-18 01:14:42 +08:00
|
|
|
def execute(self, queries, num_client_workers):
|
2017-07-29 23:03:34 +08:00
|
|
|
self.log.debug("execute('%s')", str(queries))
|
2017-08-25 17:08:45 +08:00
|
|
|
|
|
|
|
client = os.path.normpath(os.path.join(DIR_PATH,
|
|
|
|
"../../../build/tests/macro_benchmark/harness_client"))
|
|
|
|
if not os.path.exists(client):
|
|
|
|
# Apollo builds both debug and release binaries on diff
|
|
|
|
# so we need to use the release client if the debug one
|
|
|
|
# doesn't exist
|
|
|
|
client = os.path.normpath(os.path.join(DIR_PATH,
|
|
|
|
"../../../build_release/tests/macro_benchmark/"
|
|
|
|
"harness_client"))
|
|
|
|
|
2017-07-11 23:45:34 +08:00
|
|
|
queries_fd, queries_path = tempfile.mkstemp()
|
|
|
|
try:
|
|
|
|
queries_file = os.fdopen(queries_fd, "w")
|
|
|
|
queries_file.write("\n".join(queries))
|
|
|
|
queries_file.close()
|
|
|
|
except:
|
|
|
|
queries_file.close()
|
|
|
|
os.remove(queries_path)
|
|
|
|
raise Exception("Writing queries to temporary file failed")
|
|
|
|
|
2017-08-25 17:08:45 +08:00
|
|
|
output_fd, output = tempfile.mkstemp()
|
|
|
|
os.close(output_fd)
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
client_args = ["--address", self.args.address, "--port", self.args.port,
|
2017-08-25 17:08:45 +08:00
|
|
|
"--num-workers", str(num_client_workers),
|
|
|
|
"--output", output]
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
cpu_time_start = self.database.database_bin.get_usage()["cpu"]
|
2017-07-11 00:17:18 +08:00
|
|
|
# TODO make the timeout configurable per query or something
|
|
|
|
return_code = self.bolt_client.run_and_wait(
|
2017-07-30 02:40:29 +08:00
|
|
|
client, client_args, timeout=600, stdin=queries_path)
|
2017-08-30 23:17:43 +08:00
|
|
|
cpu_time_end = self.database.database_bin.get_usage()["cpu"]
|
2017-07-11 23:45:34 +08:00
|
|
|
os.remove(queries_path)
|
2017-07-11 00:17:18 +08:00
|
|
|
if return_code != 0:
|
|
|
|
with open(self.bolt_client.get_stderr()) as f:
|
|
|
|
stderr = f.read()
|
2017-07-29 23:03:34 +08:00
|
|
|
self.log.error("Error while executing queries '%s'. "
|
|
|
|
"Failed with return_code %d and stderr:\n%s",
|
|
|
|
str(queries), return_code, stderr)
|
2017-07-11 00:17:18 +08:00
|
|
|
raise Exception("BoltClient execution failed")
|
2017-08-25 17:08:45 +08:00
|
|
|
|
2017-07-30 02:40:29 +08:00
|
|
|
with open(output) as f:
|
2017-07-30 17:41:26 +08:00
|
|
|
data = json.loads(f.read())
|
2017-08-25 17:08:45 +08:00
|
|
|
data[CPU_TIME] = cpu_time_end - cpu_time_start
|
|
|
|
|
|
|
|
os.remove(output)
|
|
|
|
return data
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
def stop(self):
|
2017-07-29 23:03:34 +08:00
|
|
|
self.log.info("stop")
|
2017-07-11 00:17:18 +08:00
|
|
|
self.bolt_client.wait()
|
2017-08-30 23:17:43 +08:00
|
|
|
self.database.stop()
|
2017-07-29 23:03:34 +08:00
|
|
|
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
class MemgraphRunner(_HarnessClientRunner):
|
|
|
|
def __init__(self, args, client_cpus=None, database_cpus=None):
|
2017-09-01 17:33:14 +08:00
|
|
|
if database_cpus is None: database_cpus = [1]
|
2017-08-30 23:17:43 +08:00
|
|
|
database = Memgraph(args, database_cpus)
|
|
|
|
super(MemgraphRunner, self).__init__(args, database, cpus=client_cpus)
|
2017-07-29 23:03:34 +08:00
|
|
|
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
class NeoRunner(_HarnessClientRunner):
|
|
|
|
def __init__(self, args, client_cpus=None, database_cpus=None):
|
2017-09-01 17:33:14 +08:00
|
|
|
if database_cpus is None: database_cpus = [1]
|
2017-08-30 23:17:43 +08:00
|
|
|
database = Neo(args, database_cpus)
|
|
|
|
super(NeoRunner, self).__init__(args, database, cpus=client_cpus)
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
def main():
|
2017-07-11 00:17:18 +08:00
|
|
|
argp = ArgumentParser(description=__doc__)
|
|
|
|
# positional, mandatory args
|
|
|
|
argp.add_argument("suite", help="Suite to run.")
|
|
|
|
argp.add_argument("runner", help="Engine to use.")
|
|
|
|
# named, optional arguments
|
|
|
|
argp.add_argument("--groups", nargs="+", help="Groups to run. If none are"
|
|
|
|
" provided, all available grups are run.")
|
|
|
|
argp.add_argument("--scenarios", nargs="+", help="Scenarios to run. If "
|
|
|
|
"none are provided, all available are run.")
|
|
|
|
argp.add_argument("--logging", default="INFO", choices=["INFO", "DEBUG"],
|
|
|
|
help="Logging level")
|
|
|
|
argp.add_argument("--additional-run-fields", default={}, type=json.loads,
|
|
|
|
help="Additional fields to add to the 'run', in JSON")
|
2017-08-25 21:05:40 +08:00
|
|
|
argp.add_argument("--no-strict", default=False, action="store_true",
|
|
|
|
help="Ignores nonexisting groups instead of raising an "
|
|
|
|
"exception")
|
2017-08-30 23:17:43 +08:00
|
|
|
args, remaining_args = argp.parse_known_args()
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
if args.logging:
|
|
|
|
logging.basicConfig(level=args.logging)
|
|
|
|
logging.getLogger("requests").setLevel(logging.WARNING)
|
|
|
|
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
|
|
|
logging.getLogger("neo4j.bolt").setLevel(logging.WARNING)
|
|
|
|
log.info("Memgraph benchmark suite harness")
|
2017-07-16 23:35:02 +08:00
|
|
|
log.info("Executing for suite '%s', runner '%s'", args.suite, args.runner)
|
2017-07-11 00:17:18 +08:00
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
# Create suites.
|
2017-07-29 23:03:34 +08:00
|
|
|
suites = {"QuerySuite": QuerySuite,
|
|
|
|
"QueryParallelSuite": QueryParallelSuite}
|
2017-07-11 00:17:18 +08:00
|
|
|
if args.suite not in suites:
|
|
|
|
raise Exception(
|
|
|
|
"Suite '{}' isn't registered. Registered suites are: {}".format(
|
|
|
|
args.suite, suites))
|
|
|
|
suite = suites[args.suite](remaining_args)
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
# Load scenarios.
|
|
|
|
group_scenarios = load_scenarios(remaining_args)
|
2017-07-11 00:17:18 +08:00
|
|
|
log.info("Loaded %d groups, with a total of %d scenarios",
|
|
|
|
len(group_scenarios),
|
|
|
|
sum([len(x) for x in group_scenarios.values()]))
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
# Create runners.
|
2017-07-29 23:03:34 +08:00
|
|
|
runners = {"MemgraphRunner": MemgraphRunner, "NeoRunner": NeoRunner}
|
2017-07-11 00:17:18 +08:00
|
|
|
if args.runner not in suite.runners():
|
|
|
|
raise Exception("Runner '{}' not registered for suite '{}'".format(
|
|
|
|
args.runner, args.suite))
|
|
|
|
runner = runners[args.runner](remaining_args)
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
# Validate groups (if provided).
|
2017-08-25 21:05:40 +08:00
|
|
|
groups = []
|
2017-07-11 00:17:18 +08:00
|
|
|
if args.groups:
|
|
|
|
for group in args.groups:
|
|
|
|
if group not in suite.groups():
|
2017-08-25 21:05:40 +08:00
|
|
|
msg = "Group '{}' isn't registered for suite '{}'".format(
|
|
|
|
group, suite)
|
|
|
|
if args.no_strict:
|
|
|
|
log.warn(msg)
|
|
|
|
else:
|
|
|
|
raise Exception(msg)
|
|
|
|
else:
|
|
|
|
groups.append(group)
|
2017-07-11 00:17:18 +08:00
|
|
|
else:
|
|
|
|
# No groups provided, use all suite group
|
|
|
|
groups = suite.groups()
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
# Filter scenarios.
|
2017-07-11 00:17:18 +08:00
|
|
|
# TODO enable scenario filtering on regex
|
|
|
|
filtered_scenarios = OrderedDict()
|
|
|
|
for group, scenarios in group_scenarios.items():
|
|
|
|
if group not in groups:
|
|
|
|
log.info("Skipping group '%s'", group)
|
|
|
|
continue
|
|
|
|
for scenario_name, scenario in scenarios:
|
|
|
|
if args.scenarios and scenario_name not in args.scenarios:
|
|
|
|
continue
|
|
|
|
filtered_scenarios[(group, scenario_name)] = scenario
|
|
|
|
|
2017-07-18 01:14:42 +08:00
|
|
|
if len(filtered_scenarios) == 0:
|
2017-07-11 00:17:18 +08:00
|
|
|
log.info("No scenarios to execute")
|
|
|
|
return
|
|
|
|
|
2017-08-30 23:17:43 +08:00
|
|
|
# Run scenarios.
|
2017-07-11 00:17:18 +08:00
|
|
|
log.info("Executing %d scenarios", len(filtered_scenarios))
|
|
|
|
results = []
|
2017-08-28 16:51:19 +08:00
|
|
|
for (group, scenario_name), scenario in sorted(filtered_scenarios.items()):
|
2017-07-11 00:17:18 +08:00
|
|
|
log.info("Executing group.scenario '%s.%s' with elements %s",
|
|
|
|
group, scenario_name, list(scenario.keys()))
|
2017-08-24 21:03:21 +08:00
|
|
|
for iter_result in suite.run(scenario, group, scenario_name, runner):
|
2017-07-16 23:35:02 +08:00
|
|
|
iter_result["group"] = group
|
|
|
|
iter_result["scenario"] = scenario_name
|
2017-07-11 00:17:18 +08:00
|
|
|
results.append(iter_result)
|
2017-08-30 23:17:43 +08:00
|
|
|
|
|
|
|
# Save results.
|
2017-07-11 00:17:18 +08:00
|
|
|
run = dict()
|
|
|
|
run["suite"] = args.suite
|
|
|
|
run["runner"] = runner.__class__.__name__
|
|
|
|
run["runner_config"] = vars(runner.args)
|
|
|
|
run.update(args.additional_run_fields)
|
2017-07-16 23:35:02 +08:00
|
|
|
for result in results:
|
2017-07-30 00:05:19 +08:00
|
|
|
jail.store_data(result)
|
2017-08-30 23:17:43 +08:00
|
|
|
|
|
|
|
# Print summary.
|
2017-08-24 21:03:21 +08:00
|
|
|
print("\n\nMacro benchmark summary:")
|
|
|
|
print("{}\n".format(suite.summary))
|
2017-07-30 03:44:39 +08:00
|
|
|
with open(os.path.join(DIR_PATH, ".harness_summary"), "w") as f:
|
|
|
|
print(suite.summary, file=f)
|
2017-07-11 00:17:18 +08:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|