Migrate to new jail faker

Reviewers: mferencevic

Reviewed By: mferencevic

Differential Revision: https://phabricator.memgraph.io/D599
This commit is contained in:
Mislav Bradac 2017-07-29 18:05:19 +02:00
parent 404ffdc4ea
commit 06d3629fd0
4 changed files with 108 additions and 86 deletions

View File

@ -35,3 +35,6 @@
# use ast caching
--ast-cache=true
# number of workers
--num-workers=2

View File

@ -0,0 +1 @@
.storage/

View File

@ -14,7 +14,13 @@ from collections import defaultdict
import tempfile
import shutil
import jail_faker as jail
try:
import jail
APOLLO = True
except:
import jail_faker as jail
APOLLO = False
from bolt_client import WALL_TIME
from perf import Perf
@ -202,7 +208,8 @@ class _QuerySuite:
except:
pass
pid = runner.start()
if not APOLLO:
pid = runner.start()
execute("setup")
# warmup phase
@ -351,7 +358,7 @@ class _BaseRunner:
self.log.info("stop")
self.bolt_client.send_signal(jail.SIGKILL)
self.bolt_client.wait()
self.database_bin.send_signal(jail.SIGKILL)
self.database_bin.send_signal(jail.SIGTERM)
self.database_bin.wait()
@ -377,7 +384,9 @@ class MemgraphRunner(_BaseRunner):
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.database_bin = jail.get_process()
self.database_bin.set_cpus([1])
self.bolt_client = jail.get_process()
self.bolt_client.set_cpus([2, 3])
def start(self):
self.log.info("start")
@ -387,7 +396,7 @@ class MemgraphRunner(_BaseRunner):
timeout=10000)
# TODO change to a check via SIGUSR
time.sleep(1.0)
return self.database_bin.get_pid()
return self.database_bin.get_pid() if not APOLLO else None
class NeoRunner(_BaseRunner):
@ -407,7 +416,9 @@ class NeoRunner(_BaseRunner):
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.database_bin = jail.get_process()
self.database_bin.set_cpus([1])
self.bolt_client = jail.get_process()
self.bolt_client.set_cpus([2, 3])
def start(self):
self.log.info("start")
@ -421,7 +432,7 @@ class NeoRunner(_BaseRunner):
env=environment, timeout=10000)
# TODO change to a check via SIGUSR
time.sleep(5.0)
return self.database_bin.get_pid()
return self.database_bin.get_pid() if not APOLLO else None
def parse_known_args():
@ -515,7 +526,7 @@ def main():
run["runner_config"] = vars(runner.args)
run.update(args.additional_run_fields)
for result in results:
jail.store_data(json.dumps(result))
jail.store_data(result)
print("\n\n{}\n".format(suite.summary))

View File

@ -1,6 +1,8 @@
#!/usr/bin/python3
import atexit
import json
import os
import resource
import shutil
import subprocess
import sys
@ -8,8 +10,7 @@ import threading
import time
import uuid
from signal import *
import datetime
import json
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -17,10 +18,16 @@ TEMP_DIR = os.path.join(SCRIPT_DIR, ".temp")
STORAGE_DIR = os.path.join(SCRIPT_DIR, ".storage")
class ProcessException(Exception):
pass
class StorageException(Exception):
pass
class Process:
def __init__(self, tid):
self._tid = tid
@ -30,9 +37,9 @@ class Process:
self._usage = {}
self._files = []
def run(self, binary, args=[], env={}, timeout=10000, stdin="/dev/null"):
def run(self, binary, args = [], env = {}, timeout = 120, stdin = "/dev/null", cwd = "."):
# don't start a new process if one is already running
if self._proc is not None and self._proc.returncode is None:
if self._proc != None and self._proc.returncode == None:
raise ProcessException
# clear previous usage
@ -56,30 +63,23 @@ class Process:
self._timeout = timeout
# start process
self._proc = subprocess.Popen(exe, env = env,
self._proc = subprocess.Popen(exe, env = env, cwd = cwd,
stdin = open(stdin, "r"),
stdout = open(stdout, "w"),
stderr = open(stderr, "w"))
def wait(self):
if self._proc == None:
raise ProcessException()
self._proc.wait()
return self._proc.returncode
def run_and_wait(self, *args, **kwargs):
check = kwargs.pop("check", True)
self.run(*args, **kwargs)
return self.wait()
def get_pid(self):
def wait(self, check = True):
if self._proc == None:
raise ProcessException
return self._proc.pid
def get_usage(self):
if self._proc == None:
self._proc.wait()
if check and self._proc.returncode != 0:
raise ProcessException
return self._usage
return self._proc.returncode
def get_status(self):
if self._proc == None:
@ -87,20 +87,49 @@ class Process:
self._proc.poll()
return self._proc.returncode
def send_signal(self, signal):
if self._proc == None:
raise ProcessException
self._proc.send_signal(signal)
def get_usage(self):
if self._proc == None:
raise ProcessException
return self._usage
# this is implemented only in the real API
def set_cpus(self, cpus, hyper = True):
s = "out" if not hyper else ""
sys.stderr.write("WARNING: Trying to set cpus for {} to "
"{} with{} hyperthreading!\n".format(str(self), cpus, s))
# this is implemented only in the real API
def set_nproc(self, nproc):
sys.stderr.write("WARNING: Trying to set nproc for {} to "
"{}!\n".format(str(self), nproc))
# this is implemented only in the real API
def set_memory(self, memory):
sys.stderr.write("WARNING: Trying to set memory for {} to "
"{}\n".format(str(self), memory))
# this currently isn't implemented in the real API
def get_stdout(self):
if self._proc == None or self._proc.returncode == None:
raise ProcessException
return self._files[0]
# this currently isn't implemented in the real API
def get_stderr(self):
if self._proc == None or self._proc.returncode == None:
raise ProcessException
return self._files[1]
def send_signal(self, signal):
# WARNING: this won't be implemented in the real API
def get_pid(self):
if self._proc == None:
raise ProcessException
self._proc.send_signal(signal)
return self._proc.pid
def _temporary_file(self, name):
return os.path.join(TEMP_DIR, ".".join([name, str(uuid.uuid4()), "dat"]))
@ -131,7 +160,7 @@ class Process:
return
# for a description of these fields see: man proc; man times
cpu_time = sum(map(lambda x: int(x) / self._ticks_per_sec, data_stat[13:17]))
self._set_usage(cpu_time, "cpu_usage", only_value = True)
self._set_usage(cpu_time, "cpu", only_value = True)
self._set_usage(int(data_stat[19]), "threads")
mem_vm, mem_res, mem_shr = map(lambda x: int(x) * self._page_size // 1024, data_statm[:3])
self._set_usage(mem_res, "memory")
@ -170,7 +199,8 @@ if os.path.exists(TEMP_DIR):
shutil.rmtree(TEMP_DIR)
os.mkdir(TEMP_DIR)
_storage_file = os.path.join(STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json")
_storage_name = os.path.join(STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json")
_storage_file = open(_storage_name, "w")
@atexit.register
def cleanup():
@ -179,6 +209,7 @@ def cleanup():
proc.send_signal(SIGKILL)
proc.get_status()
shutil.rmtree(TEMP_DIR)
_storage_file.close()
# end of private methods ------------------------------------------------------
@ -191,68 +222,44 @@ def get_process():
return proc
return None
# TODO: ovo treba napravit
def store_data(_data, self):
data = json.loads(_data)
assert "unit" in data, "unit is nonoptional field"
assert "type" in data, "type is nonoptional field"
assert "value" in data, "value is nonoptional field"
if not hasattr(self, "timestamp"):
self.timestamp = datetime.datetime.now().isoformat()
with open(os.path.join(os.path.dirname(__file__), "results",
self.timestamp), "a") as results_file:
json.dump(data, results_file)
results_file.write("\n")
def get_host_info():
with open("/proc/meminfo") as f:
memdata = f.read()
memory = 0
for row in memdata.split("\n"):
tmp = row.split()
if tmp[0] == "MemTotal:":
memory = int(tmp[1])
break
# TODO: treba assertat da data ima neke keyeve u sebi
# TODO: to trebaju bit keyevi value, type i sl...
with open("/proc/cpuinfo") as f:
cpudata = f.read()
threads, cpus = 0, set()
for row in cpudata.split("\n\n"):
if not row: continue
data = row.split("\n")
core_id, physical_id = -1, -1
for line in data:
name, val = map(lambda x: x.strip(), line.split(":"))
if name == "physical id": physical_id = int(val)
elif name == "core id": core_id = int(val)
threads += 1
cpus.add((core_id, physical_id))
cpus = len(cpus)
# unit - obavezno
# type - obavezno
# target - (setup, tardown, ...)
# iter
# value - obavezno
# scenario
# group
# status ?
# cpu_clock - obavezno?
hyper = True if cpus != threads else False
# timestamp - obavezno, automatski
# query
# engine
# engine_config
return {"cpus": cpus, "memory": memory, "hyperthreading": hyper,
"threads": threads}
# TODO: store-aj ovo kao validni json
# to znaci da treba bit lista dictionary-ja
pass
store_data.__defaults__ = (store_data,)
if __name__ == "__main__":
proc = get_process()
proc.run("/home/matej/memgraph/jail/api/tester", timeout = 1500)
#proc.run("/home/matej/memgraph/memgraph/build/memgraph_829_97638d3_dev_debug", env={"MEMGRAPH_CONFIG": "/home/matej/memgraph/memgraph/config/memgraph.yaml"})
time.sleep( 1.000 )
print("usage1:", proc.get_usage())
print("status:", proc.get_status())
#proc.send_signal(SIGTERM)
cnt = 0
while proc.get_status() == None:
usage = proc.get_usage()
usage_str = " "
for key in sorted(usage.keys()):
usage_str += key + (": %10.3f" % usage[key]) + "; "
print(usage_str)
time.sleep( 0.1 )
cnt += 1
proc.send_signal(SIGTERM)
print("status", proc.get_status())
print("usage2", proc.get_usage())
while proc.get_status() == None:
print("cekam da umre...")
time.sleep( 0.1 )
print("stdout 3:", proc.get_stdout())
def store_data(data):
if not type(data) == dict:
raise StorageException("Data must be a dictionary!")
for i in ["unit", "type", "value"]:
if not i in data:
raise StorageException("Field '{}' missing in data!".format(i))
data["timestamp"] = time.time()
print("STORE DATA:", data)
_storage_file.write(json.dumps(data) + "\n")