Add parallel macro benchmark suite

Reviewers: mislav.bradac

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D922
This commit is contained in:
Matej Ferencevic 2017-10-23 16:17:06 +02:00
parent d37f37ca62
commit cbf8dacc11
9 changed files with 120 additions and 76 deletions

View File

@ -1,26 +0,0 @@
# MEMGRAPH DEFAULT BENCHMARKING CONFIG
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# directory to the folder with snapshots
--snapshot-directory=snapshots
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--snapshot-cycle-sec=-1
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--query_execution_time_sec=-1
# create snapshot disabled on db exit
--snapshot-on-exit=false
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited
--snapshot-max-retained=-1
# database recovering is disabled by default
--snapshot-recover-on-startup=false

View File

@ -16,12 +16,14 @@ except:
# This could be a function, not a class, but we want to reuse jail process since
# we can instantiate only 8 of them.
class QueryClient:
def __init__(self, args):
def __init__(self, args, default_num_workers):
self.log = logging.getLogger("QueryClient")
self.client = jail.get_process()
set_cpus("client-cpu-ids", self.client, args)
self.default_num_workers = default_num_workers
def __call__(self, queries, database, num_client_workers):
def __call__(self, queries, database, num_workers=None):
if num_workers is None: num_workers = self.default_num_workers
self.log.debug("execute('%s')", str(queries))
client_path = "tests/macro_benchmark/query_client"
@ -46,7 +48,7 @@ class QueryClient:
os.close(output_fd)
client_args = ["--port", database.args.port,
"--num-workers", str(num_client_workers),
"--num-workers", str(num_workers),
"--output", output]
cpu_time_start = database.database_bin.get_usage()["cpu"]

View File

@ -44,7 +44,7 @@ class Memgraph:
env = {"MEMGRAPH_CONFIG": self.config}
database_args = ["--port", self.args.port]
if self.num_workers:
database_args += ["--num_workers", self.num_workers]
database_args += ["--num_workers", str(self.num_workers)]
# find executable path
runner_bin = self.args.runner_bin

View File

@ -1,4 +1,3 @@
{
"iterations": 5,
"num_client_workers": 16
"iterations": 3
}

View File

@ -0,0 +1 @@
MATCH (n) DETACH DELETE n

View File

@ -7,7 +7,7 @@ from argparse import ArgumentParser
from collections import defaultdict
import tempfile
from statistics import median
from common import get_absolute_path, WALL_TIME, CPU_TIME, MAX_MEMORY
from common import get_absolute_path, WALL_TIME, CPU_TIME, MAX_MEMORY, APOLLO
from databases import Memgraph, Neo
from clients import QueryClient
@ -40,7 +40,7 @@ class _QuerySuite:
scenario_config = scenario.get("config")
scenario_config = next(scenario_config()) if scenario_config else {}
def execute(config_name, num_client_workers=1):
def execute(config_name, num_client_workers=None):
queries = scenario.get(config_name)
start_time = time.time()
if queries:
@ -65,7 +65,6 @@ class _QuerySuite:
pass
measurements = []
measurement_lists = defaultdict(list)
# Run the whole test 3 times because memgraph is sometimes
@ -78,7 +77,7 @@ class _QuerySuite:
for _ in range(min(scenario_config.get("iterations", 1),
scenario_config.get("warmup", 2))):
execute("itersetup")
execute("run", scenario_config.get("num_client_workers", 1))
execute("run")
execute("iterteardown")
# TODO per scenario/run runner configuration
@ -89,8 +88,7 @@ class _QuerySuite:
# have to start and stop the client for each iteration, it would
# most likely run faster
execute("itersetup")
run_result = execute("run",
scenario_config.get("num_client_workers", 1))
run_result = execute("run")
add_measurement(run_result, iteration, CPU_TIME)
add_measurement(run_result, iteration, MAX_MEMORY)
assert len(run_result["groups"]) == 1, \
@ -158,8 +156,8 @@ class QueryParallelSuite(_QuerySuite):
_QuerySuite.__init__(self, args)
def runners(self):
# TODO: We should use different runners which will use more threads.
return {"MemgraphRunner" : MemgraphRunner, "NeoRunner" : NeoRunner}
return {"MemgraphRunner" : MemgraphParallelRunner, "NeoRunner" :
NeoParallelRunner}
def groups(self):
return ["aggregation_parallel", "create_parallel"]
@ -172,10 +170,10 @@ class _QueryRunner:
Execution returns benchmarking data (execution times, memory
usage etc).
"""
def __init__(self, args, database):
def __init__(self, args, database, num_client_workers):
self.log = logging.getLogger("_HarnessClientRunner")
self.database = database
self.query_client = QueryClient(args)
self.query_client = QueryClient(args, num_client_workers)
def start(self):
self.database.start()
@ -195,13 +193,11 @@ class MemgraphRunner(_QueryRunner):
def __init__(self, args):
argp = ArgumentParser("MemgraphRunnerArgumentParser")
argp.add_argument("--runner-config", default=get_absolute_path(
"benchmarking_latency.conf", "config"),
"benchmarking.conf", "config"),
help="Path to memgraph config")
argp.add_argument("--num-workers", help="Number of workers")
self.args, remaining_args = argp.parse_known_args(args)
database = Memgraph(remaining_args, self.args.runner_config,
self.args.num_workers)
super(MemgraphRunner, self).__init__(remaining_args, database)
database = Memgraph(remaining_args, self.args.runner_config, 1)
super(MemgraphRunner, self).__init__(remaining_args, database, 1)
class NeoRunner(_QueryRunner):
@ -216,3 +212,46 @@ class NeoRunner(_QueryRunner):
self.args, remaining_args = argp.parse_known_args(args)
database = Neo(remaining_args, self.args.runner_config)
super(NeoRunner, self).__init__(remaining_args, database)
class NeoParallelRunner(_QueryRunner):
"""
Configures neo4j database for QuerySuite execution.
"""
def __init__(self, args):
argp = ArgumentParser("NeoRunnerArgumentParser")
argp.add_argument("--runner-config",
default=get_absolute_path("config/neo4j.conf"),
help="Path to neo config file")
argp.add_argument("--num-client-workers", type=int, default=24,
help="Number of clients")
self.args, remaining_args = argp.parse_known_args(args)
assert not APOLLO or self.args.num_client_workers, \
"--client-num-clients is obligatory flag on apollo"
database = Neo(remaining_args, self.args.runner_config)
super(NeoRunner, self).__init__(
remaining_args, database, self.args.num_client_workers)
class MemgraphParallelRunner(_QueryRunner):
"""
Configures memgraph database for QuerySuite execution.
"""
def __init__(self, args):
argp = ArgumentParser("MemgraphRunnerArgumentParser")
argp.add_argument("--runner-config", default=get_absolute_path(
"benchmarking.conf", "config"),
help="Path to memgraph config")
argp.add_argument("--num-database-workers", type=int, default=8,
help="Number of workers")
argp.add_argument("--num-client-workers", type=int, default=24,
help="Number of clients")
self.args, remaining_args = argp.parse_known_args(args)
assert not APOLLO or self.args.num_database_workers, \
"--num-database--workers is obligatory flag on apollo"
assert not APOLLO or self.args.num_client_workers, \
"--num-client-workers is obligatory flag on apollo"
database = Memgraph(remaining_args, self.args.runner_config,
self.args.num_database_workers)
super(MemgraphParallelRunner, self).__init__(
remaining_args, database, self.args.num_client_workers)

View File

@ -188,6 +188,12 @@ MACRO_BENCHMARK_ARGS = (
"QuerySuite MemgraphRunner "
"--groups aggregation 1000_create unwind_create dense_expand match "
"--no-strict --database-cpu-ids 1 --client-cpu-ids 2")
MACRO_PARALLEL_BENCHMARK_ARGS = (
"QueryParallelSuite MemgraphRunner --groups aggregation_parallel "
"--database-cpu-ids 1 2 3 4 5 6 7 8 9 "
"--client-cpu-ids 10 11 12 13 14 15 16 17 18 19 "
"--num-database-workers 9 --num-clients-workers 30 --no-strict")
macro_bench_path = os.path.join(BASE_DIR, "tests", "macro_benchmark")
harness_client_binaries = os.path.join(BUILD_RELEASE_DIR, "tests",
"macro_benchmark")
@ -197,9 +203,17 @@ infile = create_archive("macro_benchmark", [binary_release_path,
harness_client_binaries, postgresql_lib_dir], cwd = WORKSPACE_DIR)
supervisor = "./memgraph/tests/macro_benchmark/harness.py"
outfile_paths = "\./memgraph/tests/macro_benchmark/\.harness_summary"
RUNS.append(generate_run("macro_benchmark", supervisor = supervisor,
arguments = MACRO_BENCHMARK_ARGS, infile = infile,
RUNS.append(generate_run("macro_benchmark__query_suite",
supervisor = supervisor,
arguments = MACRO_BENCHMARK_ARGS,
infile = infile,
outfile_paths = outfile_paths))
RUNS.append(generate_run("macro_benchmark__query_parallel_suite",
supervisor = supervisor,
arguments = MACRO_PARALLEL_BENCHMARK_ARGS,
infile = infile,
outfile_paths = outfile_paths,
slave_group = "remote_20c140g"))
# macro benchmark parent tests
if mode == "diff":
@ -221,8 +235,17 @@ if mode == "diff":
supervisor = "./parent/tests/macro_benchmark/harness.py"
args = MACRO_BENCHMARK_ARGS + " --RunnerBin " + binary_parent_path
outfile_paths = "\./parent/tests/macro_benchmark/\.harness_summary"
RUNS.append(generate_run("macro_benchmark_parent", supervisor = supervisor,
arguments = args, infile = infile, outfile_paths = outfile_paths))
RUNS.append(generate_run("macro_benchmark_parent__query_suite",
supervisor = supervisor,
arguments = MACRO_BENCHMARK_ARGS + " --RunnerBin " + binary_parent_path,
infile = infile,
outfile_paths = outfile_paths))
RUNS.append(generate_run("macro_benchmark_parent__query_parallel_suite",
supervisor = supervisor,
arguments = MACRO_PARALLEL_BENCHMARK_ARGS + " --RunnerBin " + binary_parent_path,
infile = infile,
outfile_paths = outfile_paths,
slave_group = "remote_20c140g"))
# macro benchmark comparison data process
script_path = os.path.join(BASE_DIR, "tools", "apollo",
@ -230,9 +253,13 @@ if mode == "diff":
infile = create_archive("macro_benchmark_summary", [script_path],
cwd = WORKSPACE_DIR)
cmd = "./memgraph/tools/apollo/macro_benchmark_summary " \
"macro_benchmark/memgraph/tests/macro_benchmark/.harness_summary " \
"macro_benchmark_parent/parent/tests/macro_benchmark/.harness_summary " \
".harness_summary"
"--current " \
"macro_benchmark__query_suite/memgraph/tests/macro_benchmark/.harness_summary " \
"macro_benchmark__query_parallel_suite/memgraph/tests/macro_benchmark/.harness_summary " \
"--previous " \
"macro_benchmark_parent__query_suite/parent/tests/macro_benchmark/.harness_summary " \
"macro_benchmark_parent__query_parallel_suite/memgraph/tests/macro_benchmark/.harness_summary " \
"--output .harness_summary"
outfile_paths = "\./.harness_summary"
DATA_PROCESS.append(generate_run("macro_benchmark_summary", typ = "data process",
commands = cmd, infile = infile, outfile_paths = outfile_paths))

View File

@ -1,4 +1,5 @@
#!/usr/bin/python3
import argparse
import os
import sys
@ -74,7 +75,7 @@ def compare_values(data_cur, data_prev):
fmt += " //({:+.2%})//{}"
item = fmt.format(item_cur * scale, diff, sign)
else:
fmt += "//(new)// {{icon plus color=blue}}"
fmt += " //(new)// {{icon plus color=blue}}"
item = fmt.format(item_cur * scale)
performance_change = True
ret[-1].append(item)
@ -99,29 +100,30 @@ def generate_remarkup(data):
ret += "No performance change detected.\n"
return ret
if len(sys.argv) > 4 or len(sys.argv) < 3:
print("usage: {} current_values previous_values output_file".format(sys.argv[0]))
print(" output_file is optional, if not specified the script outputs")
print(" to stdout, if set to '-' then it overwrites current_values")
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process macro benchmark summary.")
parser.add_argument("--current", nargs = "+", required = True,
help = "current summary files")
parser.add_argument("--previous", nargs = "+", required = True,
help = "previous summary files")
parser.add_argument("--output", default = "",
help = "output file, if not specified the script outputs to stdout")
if len(sys.argv) == 4:
infile_cur, infile_prev, outfile = sys.argv[1:]
else:
infile_cur, infile_prev = sys.argv[1:]
outfile = ""
args = parser.parse_args()
data_cur = parse_file(infile_cur)
data_prev = parse_file(infile_prev)
data_cur, data_prev = [], []
for i, current in enumerate(args.current):
off = 0 if i == 0 else 1
data_cur += parse_file(current)[off:]
for i, previous in enumerate(args.previous):
off = 0 if i == 0 else 1
data_prev += parse_file(previous)[off:]
markup = generate_remarkup(compare_values(data_cur, data_prev))
markup = generate_remarkup(compare_values(data_cur, data_prev))
if outfile == "":
sys.stdout.write(markup)
sys.exit(0)
if args.output == "":
sys.stdout.write(markup)
sys.exit(0)
if outfile == "-":
outfile = infile_cur
with open(outfile, "w") as f:
f.write(generate_remarkup(compare_values(data_cur, data_prev)))
with open(args.output, "w") as f:
f.write(markup)