diff --git a/tests/macro_benchmark/harness/bolt_client.py b/tests/macro_benchmark/harness/bolt_client.py index 0a042a3e9..d29c51844 100644 --- a/tests/macro_benchmark/harness/bolt_client.py +++ b/tests/macro_benchmark/harness/bolt_client.py @@ -55,8 +55,9 @@ def _prepare_for_json(obj): return None -def _print_dict(d): - print(json.dumps(_prepare_for_json(d), indent=2)) +def _print_dict(fname, d): + with open(fname, "a") as f: + f.write(json.dumps(_prepare_for_json(d), indent=2)) def _run_query(args, query, self): @@ -71,6 +72,7 @@ _run_query.__defaults__ = (_run_query,) def main(): argp = connection_argument_parser() argp.add_argument("--num-workers", type=int, default=1) + argp.add_argument("--output", type=str) # Parse args and ensure that stdout is not polluted by argument parsing. try: @@ -78,7 +80,7 @@ def main(): with redirect_stderr(f): args = argp.parse_args() except: - _print_dict({RETURN_CODE: 1, ERROR_MSG: "Invalid cmd-line arguments"}) + _print_dict(args.output, {RETURN_CODE: 1, ERROR_MSG: "Invalid cmd-line arguments"}) sys.exit(1) queries = filter(lambda x: x.strip() != '', sys.stdin.read().split("\n")) @@ -91,7 +93,7 @@ def main(): end = time.time() delta_time = end - start - _print_dict({ + _print_dict(args.output, { RETURN_CODE: 0, WALL_TIME: (None if not queries else delta_time), "metadatas": metadatas diff --git a/tests/macro_benchmark/harness/harness.py b/tests/macro_benchmark/harness/harness.py old mode 100644 new mode 100755 index b53a0193c..1276b2bf0 --- a/tests/macro_benchmark/harness/harness.py +++ b/tests/macro_benchmark/harness/harness.py @@ -21,7 +21,8 @@ except: import jail_faker as jail APOLLO = False -from bolt_client import WALL_TIME +DIR_PATH = path.dirname(path.realpath(__file__)) +WALL_TIME = "wall_time" from perf import Perf log = logging.getLogger(__name__) @@ -131,7 +132,7 @@ class _QuerySuite: """ argp = ArgumentParser("QuerySuite.scenarios argument parser") argp.add_argument("--query-scenarios-root", default=path.join( - path.dirname(path.dirname(path.realpath(__file__))), "groups"), + DIR_PATH, "..", "groups"), dest="root") args, _ = argp.parse_known_args() log.info("Loading query scenarios from root: %s", args.root) @@ -209,8 +210,7 @@ class _QuerySuite: except: pass - if not APOLLO: - pid = runner.start() + pid = runner.start() execute("setup") # warmup phase @@ -316,7 +316,7 @@ class _BaseRunner: argp = ArgumentParser("RunnerArgumentParser") # TODO: These two options should be passed two database and client, not # only client as we are doing at the moment. - argp.add_argument("--RunnerUri", default="localhost:7687") + argp.add_argument("--RunnerUri", default="127.0.0.1:7687") argp.add_argument("--RunnerEncryptBolt", action="store_true") return argp @@ -326,9 +326,12 @@ class _BaseRunner: def execute(self, queries, num_client_workers): self.log.debug("execute('%s')", str(queries)) - client_args = [path.join(path.dirname(__file__), "bolt_client.py")] - client_args += ["--endpoint", self.args.RunnerUri] + client = path.join(DIR_PATH, "run_bolt_client") + client_args = ["--endpoint", self.args.RunnerUri] client_args += ["--num-workers", str(num_client_workers)] + output_fd, output = tempfile.mkstemp() + os.close(output_fd) + client_args += ["--output", output] if self.args.RunnerEncryptBolt: client_args.append("--ssl-enabled") queries_fd, queries_path = tempfile.mkstemp() @@ -343,7 +346,7 @@ class _BaseRunner: # TODO make the timeout configurable per query or something return_code = self.bolt_client.run_and_wait( - "python3", client_args, timeout=10000, stdin=queries_path) + client, client_args, timeout=600, stdin=queries_path) os.remove(queries_path) if return_code != 0: with open(self.bolt_client.get_stderr()) as f: @@ -352,12 +355,11 @@ class _BaseRunner: "Failed with return_code %d and stderr:\n%s", str(queries), return_code, stderr) raise Exception("BoltClient execution failed") - with open(self.bolt_client.get_stdout()) as f: + with open(output) as f: return json.loads(f.read()) def stop(self): self.log.info("stop") - self.bolt_client.send_signal(jail.SIGKILL) self.bolt_client.wait() self.database_bin.send_signal(jail.SIGTERM) self.database_bin.wait() @@ -375,12 +377,12 @@ class MemgraphRunner(_BaseRunner): self.log = logging.getLogger("MemgraphRunner") argp = self._get_argparser() argp.add_argument("--RunnerBin", - default=os.path.join(os.path.dirname(__file__), + default=os.path.join(DIR_PATH, "../../../build/memgraph")) argp.add_argument("--RunnerConfig", - default=os.path.join( - os.path.dirname(__file__), - "../../../config/benchmarking.conf")) + default=os.path.normpath(os.path.join( + DIR_PATH, + "../../../config/benchmarking.conf"))) # parse args self.log.info("Initializing Runner with arguments %r", args) self.args, _ = argp.parse_known_args(args) @@ -394,7 +396,7 @@ class MemgraphRunner(_BaseRunner): environment = os.environ.copy() environment["MEMGRAPH_CONFIG"] = self.args.RunnerConfig self.database_bin.run(self.args.RunnerBin, env=environment, - timeout=10000) + timeout=600) # TODO change to a check via SIGUSR time.sleep(1.0) return self.database_bin.get_pid() if not APOLLO else None @@ -407,12 +409,10 @@ class NeoRunner(_BaseRunner): argp = self._get_argparser() argp.add_argument( "--RunnerConfigDir", - default=path.join(path.dirname(path.realpath(__file__)), - "neo4j_config")) + default=path.join(DIR_PATH, "neo4j_config")) argp.add_argument( "--RunnerHomeDir", - default=path.join(path.dirname(path.realpath(__file__)), - "neo4j_home")) + default=path.join(DIR_PATH, "neo4j_home")) # parse args self.log.info("Initializing Runner with arguments %r", args) self.args, _ = argp.parse_known_args(args) @@ -430,7 +430,7 @@ class NeoRunner(_BaseRunner): if path.exists(neo4j_data_path): shutil.rmtree(neo4j_data_path) self.database_bin.run("/usr/share/neo4j/bin/neo4j", args=["console"], - env=environment, timeout=10000) + env=environment, timeout=600) # TODO change to a check via SIGUSR time.sleep(5.0) return self.database_bin.get_pid() if not APOLLO else None diff --git a/tests/macro_benchmark/harness/jail_faker.py b/tests/macro_benchmark/harness/jail_faker.py index 66ec3cfdd..0889e9dc4 100644 --- a/tests/macro_benchmark/harness/jail_faker.py +++ b/tests/macro_benchmark/harness/jail_faker.py @@ -14,7 +14,6 @@ from signal import * SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -TEMP_DIR = os.path.join(SCRIPT_DIR, ".temp") STORAGE_DIR = os.path.join(SCRIPT_DIR, ".storage") @@ -48,11 +47,6 @@ class Process: # create exe list exe = [binary] + args - # temporary stdout and stderr files - stdout = self._temporary_file("stdout") - stderr = self._temporary_file("stderr") - self._files = [stdout, stderr] - # set environment variables keys = ["PATH", "HOME", "USER", "LANG", "PWD"] for key in keys: @@ -64,9 +58,7 @@ class Process: # start process self._proc = subprocess.Popen(exe, env = env, cwd = cwd, - stdin = open(stdin, "r"), - stdout = open(stdout, "w"), - stderr = open(stderr, "w")) + stdin = open(stdin, "r")) def run_and_wait(self, *args, **kwargs): check = kwargs.pop("check", True) @@ -113,27 +105,12 @@ class Process: sys.stderr.write("WARNING: Trying to set memory for {} to " "{}\n".format(str(self), memory)) - # this currently isn't implemented in the real API - def get_stdout(self): - if self._proc == None or self._proc.returncode == None: - raise ProcessException - return self._files[0] - - # this currently isn't implemented in the real API - def get_stderr(self): - if self._proc == None or self._proc.returncode == None: - raise ProcessException - return self._files[1] - # WARNING: this won't be implemented in the real API def get_pid(self): if self._proc == None: raise ProcessException return self._proc.pid - def _temporary_file(self, name): - return os.path.join(TEMP_DIR, ".".join([name, str(uuid.uuid4()), "dat"])) - def _set_usage(self, val, name, only_value = False): self._usage[name] = val if only_value: return @@ -195,9 +172,6 @@ _thread.start() if not os.path.exists(STORAGE_DIR): os.mkdir(STORAGE_DIR) -if os.path.exists(TEMP_DIR): - shutil.rmtree(TEMP_DIR) -os.mkdir(TEMP_DIR) _storage_name = os.path.join(STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json") _storage_file = open(_storage_name, "w") @@ -208,7 +182,6 @@ def cleanup(): if proc._proc == None: continue proc.send_signal(SIGKILL) proc.get_status() - shutil.rmtree(TEMP_DIR) _storage_file.close() # end of private methods ------------------------------------------------------ diff --git a/tests/macro_benchmark/harness/run_bolt_client b/tests/macro_benchmark/harness/run_bolt_client new file mode 100755 index 000000000..f92c3074f --- /dev/null +++ b/tests/macro_benchmark/harness/run_bolt_client @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd $DIR +source ../ve3/bin/activate +python3 bolt_client.py $@ +exit $? diff --git a/tools/apollo_generate b/tools/apollo_generate index db8c71271..94c732db1 100755 --- a/tools/apollo_generate +++ b/tools/apollo_generate @@ -172,6 +172,16 @@ RUNS.append(generate_run("quality_assurance", commands = commands, "\.quality_assurance_status".format( BASE_DIR_NAME))) +# macro benchmark tests +macro_bench_path = os.path.join(BASE_DIR, "tests", "macro_benchmark") +stress_common = os.path.join(BASE_DIR, "tests", "stress", "common.py") +infile = create_archive("macro_benchmark", [binary_path, binary_link_path, + macro_bench_path, stress_common, config_path], cwd = WORKSPACE_DIR) +supervisor = "./{}/tests/macro_benchmark/harness/harness.py".format(BASE_DIR_NAME) +args = "QuerySuite MemgraphRunner --groups aggregation" +RUNS.append(generate_run("macro_benchmark", supervisor = supervisor, + arguments = args, infile = infile)) + # store ARCHIVES and RUNS store_metadata(OUTPUT_DIR, "archives", ARCHIVES) store_metadata(OUTPUT_DIR, "runs", RUNS)