First version of apollo harness integration.
Reviewers: mislav.bradac Reviewed By: mislav.bradac Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D602
This commit is contained in:
parent
d9fbcfc1a2
commit
0f562cd043
@ -55,8 +55,9 @@ def _prepare_for_json(obj):
|
||||
return None
|
||||
|
||||
|
||||
def _print_dict(d):
|
||||
print(json.dumps(_prepare_for_json(d), indent=2))
|
||||
def _print_dict(fname, d):
|
||||
with open(fname, "a") as f:
|
||||
f.write(json.dumps(_prepare_for_json(d), indent=2))
|
||||
|
||||
|
||||
def _run_query(args, query, self):
|
||||
@ -71,6 +72,7 @@ _run_query.__defaults__ = (_run_query,)
|
||||
def main():
|
||||
argp = connection_argument_parser()
|
||||
argp.add_argument("--num-workers", type=int, default=1)
|
||||
argp.add_argument("--output", type=str)
|
||||
|
||||
# Parse args and ensure that stdout is not polluted by argument parsing.
|
||||
try:
|
||||
@ -78,7 +80,7 @@ def main():
|
||||
with redirect_stderr(f):
|
||||
args = argp.parse_args()
|
||||
except:
|
||||
_print_dict({RETURN_CODE: 1, ERROR_MSG: "Invalid cmd-line arguments"})
|
||||
_print_dict(args.output, {RETURN_CODE: 1, ERROR_MSG: "Invalid cmd-line arguments"})
|
||||
sys.exit(1)
|
||||
|
||||
queries = filter(lambda x: x.strip() != '', sys.stdin.read().split("\n"))
|
||||
@ -91,7 +93,7 @@ def main():
|
||||
end = time.time()
|
||||
delta_time = end - start
|
||||
|
||||
_print_dict({
|
||||
_print_dict(args.output, {
|
||||
RETURN_CODE: 0,
|
||||
WALL_TIME: (None if not queries else delta_time),
|
||||
"metadatas": metadatas
|
||||
|
40
tests/macro_benchmark/harness/harness.py
Normal file → Executable file
40
tests/macro_benchmark/harness/harness.py
Normal file → Executable file
@ -21,7 +21,8 @@ except:
|
||||
import jail_faker as jail
|
||||
APOLLO = False
|
||||
|
||||
from bolt_client import WALL_TIME
|
||||
DIR_PATH = path.dirname(path.realpath(__file__))
|
||||
WALL_TIME = "wall_time"
|
||||
from perf import Perf
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -131,7 +132,7 @@ class _QuerySuite:
|
||||
"""
|
||||
argp = ArgumentParser("QuerySuite.scenarios argument parser")
|
||||
argp.add_argument("--query-scenarios-root", default=path.join(
|
||||
path.dirname(path.dirname(path.realpath(__file__))), "groups"),
|
||||
DIR_PATH, "..", "groups"),
|
||||
dest="root")
|
||||
args, _ = argp.parse_known_args()
|
||||
log.info("Loading query scenarios from root: %s", args.root)
|
||||
@ -209,8 +210,7 @@ class _QuerySuite:
|
||||
except:
|
||||
pass
|
||||
|
||||
if not APOLLO:
|
||||
pid = runner.start()
|
||||
pid = runner.start()
|
||||
execute("setup")
|
||||
|
||||
# warmup phase
|
||||
@ -316,7 +316,7 @@ class _BaseRunner:
|
||||
argp = ArgumentParser("RunnerArgumentParser")
|
||||
# TODO: These two options should be passed two database and client, not
|
||||
# only client as we are doing at the moment.
|
||||
argp.add_argument("--RunnerUri", default="localhost:7687")
|
||||
argp.add_argument("--RunnerUri", default="127.0.0.1:7687")
|
||||
argp.add_argument("--RunnerEncryptBolt", action="store_true")
|
||||
return argp
|
||||
|
||||
@ -326,9 +326,12 @@ class _BaseRunner:
|
||||
|
||||
def execute(self, queries, num_client_workers):
|
||||
self.log.debug("execute('%s')", str(queries))
|
||||
client_args = [path.join(path.dirname(__file__), "bolt_client.py")]
|
||||
client_args += ["--endpoint", self.args.RunnerUri]
|
||||
client = path.join(DIR_PATH, "run_bolt_client")
|
||||
client_args = ["--endpoint", self.args.RunnerUri]
|
||||
client_args += ["--num-workers", str(num_client_workers)]
|
||||
output_fd, output = tempfile.mkstemp()
|
||||
os.close(output_fd)
|
||||
client_args += ["--output", output]
|
||||
if self.args.RunnerEncryptBolt:
|
||||
client_args.append("--ssl-enabled")
|
||||
queries_fd, queries_path = tempfile.mkstemp()
|
||||
@ -343,7 +346,7 @@ class _BaseRunner:
|
||||
|
||||
# TODO make the timeout configurable per query or something
|
||||
return_code = self.bolt_client.run_and_wait(
|
||||
"python3", client_args, timeout=10000, stdin=queries_path)
|
||||
client, client_args, timeout=600, stdin=queries_path)
|
||||
os.remove(queries_path)
|
||||
if return_code != 0:
|
||||
with open(self.bolt_client.get_stderr()) as f:
|
||||
@ -352,12 +355,11 @@ class _BaseRunner:
|
||||
"Failed with return_code %d and stderr:\n%s",
|
||||
str(queries), return_code, stderr)
|
||||
raise Exception("BoltClient execution failed")
|
||||
with open(self.bolt_client.get_stdout()) as f:
|
||||
with open(output) as f:
|
||||
return json.loads(f.read())
|
||||
|
||||
def stop(self):
|
||||
self.log.info("stop")
|
||||
self.bolt_client.send_signal(jail.SIGKILL)
|
||||
self.bolt_client.wait()
|
||||
self.database_bin.send_signal(jail.SIGTERM)
|
||||
self.database_bin.wait()
|
||||
@ -375,12 +377,12 @@ class MemgraphRunner(_BaseRunner):
|
||||
self.log = logging.getLogger("MemgraphRunner")
|
||||
argp = self._get_argparser()
|
||||
argp.add_argument("--RunnerBin",
|
||||
default=os.path.join(os.path.dirname(__file__),
|
||||
default=os.path.join(DIR_PATH,
|
||||
"../../../build/memgraph"))
|
||||
argp.add_argument("--RunnerConfig",
|
||||
default=os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"../../../config/benchmarking.conf"))
|
||||
default=os.path.normpath(os.path.join(
|
||||
DIR_PATH,
|
||||
"../../../config/benchmarking.conf")))
|
||||
# parse args
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
@ -394,7 +396,7 @@ class MemgraphRunner(_BaseRunner):
|
||||
environment = os.environ.copy()
|
||||
environment["MEMGRAPH_CONFIG"] = self.args.RunnerConfig
|
||||
self.database_bin.run(self.args.RunnerBin, env=environment,
|
||||
timeout=10000)
|
||||
timeout=600)
|
||||
# TODO change to a check via SIGUSR
|
||||
time.sleep(1.0)
|
||||
return self.database_bin.get_pid() if not APOLLO else None
|
||||
@ -407,12 +409,10 @@ class NeoRunner(_BaseRunner):
|
||||
argp = self._get_argparser()
|
||||
argp.add_argument(
|
||||
"--RunnerConfigDir",
|
||||
default=path.join(path.dirname(path.realpath(__file__)),
|
||||
"neo4j_config"))
|
||||
default=path.join(DIR_PATH, "neo4j_config"))
|
||||
argp.add_argument(
|
||||
"--RunnerHomeDir",
|
||||
default=path.join(path.dirname(path.realpath(__file__)),
|
||||
"neo4j_home"))
|
||||
default=path.join(DIR_PATH, "neo4j_home"))
|
||||
# parse args
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
@ -430,7 +430,7 @@ class NeoRunner(_BaseRunner):
|
||||
if path.exists(neo4j_data_path):
|
||||
shutil.rmtree(neo4j_data_path)
|
||||
self.database_bin.run("/usr/share/neo4j/bin/neo4j", args=["console"],
|
||||
env=environment, timeout=10000)
|
||||
env=environment, timeout=600)
|
||||
# TODO change to a check via SIGUSR
|
||||
time.sleep(5.0)
|
||||
return self.database_bin.get_pid() if not APOLLO else None
|
||||
|
@ -14,7 +14,6 @@ from signal import *
|
||||
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
TEMP_DIR = os.path.join(SCRIPT_DIR, ".temp")
|
||||
STORAGE_DIR = os.path.join(SCRIPT_DIR, ".storage")
|
||||
|
||||
|
||||
@ -48,11 +47,6 @@ class Process:
|
||||
# create exe list
|
||||
exe = [binary] + args
|
||||
|
||||
# temporary stdout and stderr files
|
||||
stdout = self._temporary_file("stdout")
|
||||
stderr = self._temporary_file("stderr")
|
||||
self._files = [stdout, stderr]
|
||||
|
||||
# set environment variables
|
||||
keys = ["PATH", "HOME", "USER", "LANG", "PWD"]
|
||||
for key in keys:
|
||||
@ -64,9 +58,7 @@ class Process:
|
||||
|
||||
# start process
|
||||
self._proc = subprocess.Popen(exe, env = env, cwd = cwd,
|
||||
stdin = open(stdin, "r"),
|
||||
stdout = open(stdout, "w"),
|
||||
stderr = open(stderr, "w"))
|
||||
stdin = open(stdin, "r"))
|
||||
|
||||
def run_and_wait(self, *args, **kwargs):
|
||||
check = kwargs.pop("check", True)
|
||||
@ -113,27 +105,12 @@ class Process:
|
||||
sys.stderr.write("WARNING: Trying to set memory for {} to "
|
||||
"{}\n".format(str(self), memory))
|
||||
|
||||
# this currently isn't implemented in the real API
|
||||
def get_stdout(self):
|
||||
if self._proc == None or self._proc.returncode == None:
|
||||
raise ProcessException
|
||||
return self._files[0]
|
||||
|
||||
# this currently isn't implemented in the real API
|
||||
def get_stderr(self):
|
||||
if self._proc == None or self._proc.returncode == None:
|
||||
raise ProcessException
|
||||
return self._files[1]
|
||||
|
||||
# WARNING: this won't be implemented in the real API
|
||||
def get_pid(self):
|
||||
if self._proc == None:
|
||||
raise ProcessException
|
||||
return self._proc.pid
|
||||
|
||||
def _temporary_file(self, name):
|
||||
return os.path.join(TEMP_DIR, ".".join([name, str(uuid.uuid4()), "dat"]))
|
||||
|
||||
def _set_usage(self, val, name, only_value = False):
|
||||
self._usage[name] = val
|
||||
if only_value: return
|
||||
@ -195,9 +172,6 @@ _thread.start()
|
||||
|
||||
if not os.path.exists(STORAGE_DIR):
|
||||
os.mkdir(STORAGE_DIR)
|
||||
if os.path.exists(TEMP_DIR):
|
||||
shutil.rmtree(TEMP_DIR)
|
||||
os.mkdir(TEMP_DIR)
|
||||
|
||||
_storage_name = os.path.join(STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json")
|
||||
_storage_file = open(_storage_name, "w")
|
||||
@ -208,7 +182,6 @@ def cleanup():
|
||||
if proc._proc == None: continue
|
||||
proc.send_signal(SIGKILL)
|
||||
proc.get_status()
|
||||
shutil.rmtree(TEMP_DIR)
|
||||
_storage_file.close()
|
||||
|
||||
# end of private methods ------------------------------------------------------
|
||||
|
7
tests/macro_benchmark/harness/run_bolt_client
Executable file
7
tests/macro_benchmark/harness/run_bolt_client
Executable file
@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
cd $DIR
|
||||
source ../ve3/bin/activate
|
||||
python3 bolt_client.py $@
|
||||
exit $?
|
@ -172,6 +172,16 @@ RUNS.append(generate_run("quality_assurance", commands = commands,
|
||||
"\.quality_assurance_status".format(
|
||||
BASE_DIR_NAME)))
|
||||
|
||||
# macro benchmark tests
|
||||
macro_bench_path = os.path.join(BASE_DIR, "tests", "macro_benchmark")
|
||||
stress_common = os.path.join(BASE_DIR, "tests", "stress", "common.py")
|
||||
infile = create_archive("macro_benchmark", [binary_path, binary_link_path,
|
||||
macro_bench_path, stress_common, config_path], cwd = WORKSPACE_DIR)
|
||||
supervisor = "./{}/tests/macro_benchmark/harness/harness.py".format(BASE_DIR_NAME)
|
||||
args = "QuerySuite MemgraphRunner --groups aggregation"
|
||||
RUNS.append(generate_run("macro_benchmark", supervisor = supervisor,
|
||||
arguments = args, infile = infile))
|
||||
|
||||
# store ARCHIVES and RUNS
|
||||
store_metadata(OUTPUT_DIR, "archives", ARCHIVES)
|
||||
store_metadata(OUTPUT_DIR, "runs", RUNS)
|
||||
|
Loading…
Reference in New Issue
Block a user