memgraph/tests/macro_benchmark/clients.py

144 lines
5.4 KiB
Python
Raw Normal View History

import logging
import os
import time
import json
import tempfile
from common import get_absolute_path, WALL_TIME, CPU_TIME, MAX_MEMORY, set_cpus
log = logging.getLogger(__name__)
try:
import jail
except:
import jail_faker as jail
# This could be a function, not a class, but we want to reuse jail process since
# we can instantiate only 8 of them.
class QueryClient:
def __init__(self, args, default_num_workers):
self.log = logging.getLogger("QueryClient")
self.client = jail.get_process()
set_cpus("client-cpu-ids", self.client, args)
self.default_num_workers = default_num_workers
def __call__(self, queries, database, num_workers=None):
if num_workers is None: num_workers = self.default_num_workers
self.log.debug("execute('%s')", str(queries))
client_path = "tests/macro_benchmark/query_client"
client = get_absolute_path(client_path, "build")
if not os.path.exists(client):
# Apollo builds both debug and release binaries on diff
# so we need to use the release client if the debug one
# doesn't exist
client = get_absolute_path(client_path, "build_release")
queries_fd, queries_path = tempfile.mkstemp()
try:
queries_file = os.fdopen(queries_fd, "w")
queries_file.write("\n".join(queries))
queries_file.close()
except:
queries_file.close()
os.remove(queries_path)
raise Exception("Writing queries to temporary file failed")
output_fd, output = tempfile.mkstemp()
os.close(output_fd)
client_args = ["--port", database.args.port,
"--num-workers", str(num_workers),
"--output", output]
cpu_time_start = database.database_bin.get_usage()["cpu"]
# TODO make the timeout configurable per query or something
return_code = self.client.run_and_wait(
client, client_args, timeout=600, stdin=queries_path)
usage = database.database_bin.get_usage()
cpu_time_end = usage["cpu"]
os.remove(queries_path)
if return_code != 0:
with open(self.client.get_stderr()) as f:
stderr = f.read()
self.log.error("Error while executing queries '%s'. "
"Failed with return_code %d and stderr:\n%s",
str(queries), return_code, stderr)
raise Exception("BoltClient execution failed")
data = {"groups" : []}
with open(output) as f:
for line in f:
data["groups"].append(json.loads(line))
data[CPU_TIME] = cpu_time_end - cpu_time_start
data[MAX_MEMORY] = usage["max_memory"]
os.remove(output)
return data
class LongRunningClient:
def __init__(self, args, default_num_workers, workload):
self.log = logging.getLogger("LongRunningClient")
self.client = jail.get_process()
set_cpus("client-cpu-ids", self.client, args)
self.default_num_workers = default_num_workers
self.workload = workload
# TODO: This is quite similar to __call__ method of QueryClient. Remove
# duplication.
def __call__(self, config, database, duration, client, num_workers=None):
if num_workers is None: num_workers = self.default_num_workers
self.log.debug("execute('%s')", config)
client_path = "tests/macro_benchmark/{}".format(client)
client = get_absolute_path(client_path, "build")
if not os.path.exists(client):
# Apollo builds both debug and release binaries on diff
# so we need to use the release client if the debug one
# doesn't exist
client = get_absolute_path(client_path, "build_release")
config_fd, config_path = tempfile.mkstemp()
try:
config_file = os.fdopen(config_fd, "w")
print(json.dumps(config, indent=4), file=config_file)
config_file.close()
except:
config_file.close()
os.remove(config_path)
raise Exception("Writing config to temporary file failed")
output_fd, output = tempfile.mkstemp()
os.close(output_fd)
client_args = ["--port", database.args.port,
"--num-workers", str(num_workers),
"--output", output,
"--duration", str(duration),
"--db", database.name,
"--scenario", self.workload]
return_code = self.client.run_and_wait(
client, client_args, timeout=600, stdin=config_path)
os.remove(config_path)
if return_code != 0:
with open(self.client.get_stderr()) as f:
stderr = f.read()
self.log.error("Error while executing config '%s'. "
"Failed with return_code %d and stderr:\n%s",
str(config), return_code, stderr)
raise Exception("BoltClient execution failed")
# TODO: We shouldn't wait for process to finish to start reading output.
# We should implement periodic reading of data and stream data when it
# becomes available.
data = []
with open(output) as f:
for line in f:
data.append(json.loads(line))
os.remove(output)
return data