Add possibility to give extra tests arg to MGBench
This commit is contained in:
parent
787987168c
commit
1f778ba5f3
@ -26,6 +26,7 @@ import log
|
|||||||
import helpers
|
import helpers
|
||||||
import runners
|
import runners
|
||||||
import importlib
|
import importlib
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
def get_queries(gen, count):
|
def get_queries(gen, count):
|
||||||
@ -120,6 +121,7 @@ parser.add_argument(
|
|||||||
parser.add_argument("--no-properties-on-edges", action="store_true", help="disable properties on edges")
|
parser.add_argument("--no-properties-on-edges", action="store_true", help="disable properties on edges")
|
||||||
parser.add_argument("--datasets", default="datasets", help="datasets to scan")
|
parser.add_argument("--datasets", default="datasets", help="datasets to scan")
|
||||||
parser.add_argument("--datasets-path", default=".", help="path to datasets to scan")
|
parser.add_argument("--datasets-path", default=".", help="path to datasets to scan")
|
||||||
|
parser.add_argument("--test-system-args", default="")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
sys.path.append(args.datasets_path)
|
sys.path.append(args.datasets_path)
|
||||||
@ -175,9 +177,12 @@ for dataset, tests in benchmarks:
|
|||||||
dataset.prepare(cache.cache_directory("datasets", dataset.NAME, dataset.get_variant()))
|
dataset.prepare(cache.cache_directory("datasets", dataset.NAME, dataset.get_variant()))
|
||||||
|
|
||||||
# Prepare runners and import the dataset.
|
# Prepare runners and import the dataset.
|
||||||
memgraph = runners.Memgraph(args.memgraph_binary, args.temporary_directory, not args.no_properties_on_edges)
|
memgraph = runners.Memgraph(
|
||||||
|
args.memgraph_binary, args.temporary_directory, not args.no_properties_on_edges, args.test_system_args
|
||||||
|
)
|
||||||
client = runners.Client(args.client_binary, args.temporary_directory)
|
client = runners.Client(args.client_binary, args.temporary_directory)
|
||||||
memgraph.start_preparation()
|
memgraph.start_preparation()
|
||||||
|
time.sleep(5.0) # giving enough time to machine manager and all to start up
|
||||||
ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
|
ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
|
||||||
usage = memgraph.stop()
|
usage = memgraph.stop()
|
||||||
|
|
||||||
|
@ -40,8 +40,7 @@ def _convert_args_to_flags(*args, **kwargs):
|
|||||||
def _get_usage(pid):
|
def _get_usage(pid):
|
||||||
total_cpu = 0
|
total_cpu = 0
|
||||||
with open("/proc/{}/stat".format(pid)) as f:
|
with open("/proc/{}/stat".format(pid)) as f:
|
||||||
total_cpu = (sum(map(int, f.read().split(")")[1].split()[11:15])) /
|
total_cpu = sum(map(int, f.read().split(")")[1].split()[11:15])) / os.sysconf(os.sysconf_names["SC_CLK_TCK"])
|
||||||
os.sysconf(os.sysconf_names["SC_CLK_TCK"]))
|
|
||||||
peak_rss = 0
|
peak_rss = 0
|
||||||
with open("/proc/{}/status".format(pid)) as f:
|
with open("/proc/{}/status".format(pid)) as f:
|
||||||
for row in f:
|
for row in f:
|
||||||
@ -52,18 +51,17 @@ def _get_usage(pid):
|
|||||||
|
|
||||||
|
|
||||||
class Memgraph:
|
class Memgraph:
|
||||||
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges):
|
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges, extra_args):
|
||||||
self._memgraph_binary = memgraph_binary
|
self._memgraph_binary = memgraph_binary
|
||||||
self._directory = tempfile.TemporaryDirectory(dir=temporary_dir)
|
self._directory = tempfile.TemporaryDirectory(dir=temporary_dir)
|
||||||
self._properties_on_edges = properties_on_edges
|
self._properties_on_edges = properties_on_edges
|
||||||
self._proc_mg = None
|
self._proc_mg = None
|
||||||
|
self._extra_args = extra_args
|
||||||
atexit.register(self._cleanup)
|
atexit.register(self._cleanup)
|
||||||
|
|
||||||
# Determine Memgraph version
|
# Determine Memgraph version
|
||||||
ret = subprocess.run([memgraph_binary, "--version"],
|
ret = subprocess.run([memgraph_binary, "--version"], stdout=subprocess.PIPE, check=True)
|
||||||
stdout=subprocess.PIPE, check=True)
|
version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+", ret.stdout.decode("utf-8")).group(0)
|
||||||
version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+",
|
|
||||||
ret.stdout.decode("utf-8")).group(0)
|
|
||||||
self._memgraph_version = tuple(map(int, version.split(".")))
|
self._memgraph_version = tuple(map(int, version.split(".")))
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
@ -79,8 +77,14 @@ class Memgraph:
|
|||||||
if self._memgraph_version >= (0, 50, 0):
|
if self._memgraph_version >= (0, 50, 0):
|
||||||
kwargs["storage_properties_on_edges"] = self._properties_on_edges
|
kwargs["storage_properties_on_edges"] = self._properties_on_edges
|
||||||
else:
|
else:
|
||||||
assert self._properties_on_edges, \
|
assert self._properties_on_edges, "Older versions of Memgraph can't disable properties on edges!"
|
||||||
"Older versions of Memgraph can't disable properties on edges!"
|
|
||||||
|
if self._extra_args != "":
|
||||||
|
args_list = self._extra_args.split(" ")
|
||||||
|
assert len(args_list) % 2 == 0
|
||||||
|
for i in range(0, len(args_list) // 2):
|
||||||
|
kwargs[args_list[i]] = args_list[i + 1]
|
||||||
|
|
||||||
return _convert_args_to_flags(self._memgraph_binary, **kwargs)
|
return _convert_args_to_flags(self._memgraph_binary, **kwargs)
|
||||||
|
|
||||||
def _start(self, **kwargs):
|
def _start(self, **kwargs):
|
||||||
@ -94,8 +98,7 @@ class Memgraph:
|
|||||||
raise Exception("The database process died prematurely!")
|
raise Exception("The database process died prematurely!")
|
||||||
wait_for_server(7687)
|
wait_for_server(7687)
|
||||||
ret = self._proc_mg.poll()
|
ret = self._proc_mg.poll()
|
||||||
assert ret is None, "The database process died prematurely " \
|
assert ret is None, "The database process died prematurely " "({})!".format(ret)
|
||||||
"({})!".format(ret)
|
|
||||||
|
|
||||||
def _cleanup(self):
|
def _cleanup(self):
|
||||||
if self._proc_mg is None:
|
if self._proc_mg is None:
|
||||||
@ -121,8 +124,7 @@ class Memgraph:
|
|||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
ret, usage = self._cleanup()
|
ret, usage = self._cleanup()
|
||||||
assert ret == 0, "The database process exited with a non-zero " \
|
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
|
||||||
"status ({})!".format(ret)
|
|
||||||
return usage
|
return usage
|
||||||
|
|
||||||
|
|
||||||
@ -135,8 +137,7 @@ class Client:
|
|||||||
return _convert_args_to_flags(self._client_binary, **kwargs)
|
return _convert_args_to_flags(self._client_binary, **kwargs)
|
||||||
|
|
||||||
def execute(self, queries=None, file_path=None, num_workers=1):
|
def execute(self, queries=None, file_path=None, num_workers=1):
|
||||||
if (queries is None and file_path is None) or \
|
if (queries is None and file_path is None) or (queries is not None and file_path is not None):
|
||||||
(queries is not None and file_path is not None):
|
|
||||||
raise ValueError("Either queries or input_path must be specified!")
|
raise ValueError("Either queries or input_path must be specified!")
|
||||||
|
|
||||||
# TODO: check `file_path.endswith(".json")` to support advanced
|
# TODO: check `file_path.endswith(".json")` to support advanced
|
||||||
@ -151,8 +152,7 @@ class Client:
|
|||||||
json.dump(query, f)
|
json.dump(query, f)
|
||||||
f.write("\n")
|
f.write("\n")
|
||||||
|
|
||||||
args = self._get_args(input=file_path, num_workers=num_workers,
|
args = self._get_args(input=file_path, num_workers=num_workers, queries_json=queries_json)
|
||||||
queries_json=queries_json)
|
|
||||||
ret = subprocess.run(args, stdout=subprocess.PIPE, check=True)
|
ret = subprocess.run(args, stdout=subprocess.PIPE, check=True)
|
||||||
data = ret.stdout.decode("utf-8").strip().split("\n")
|
data = ret.stdout.decode("utf-8").strip().split("\n")
|
||||||
return list(map(json.loads, data))
|
return list(map(json.loads, data))
|
||||||
|
Loading…
Reference in New Issue
Block a user