940bf6722c
* Add Docker runner * Add Docker client * Add benchgraph.sh script * Add package script
807 lines
33 KiB
Python
Executable File
807 lines
33 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Copyright 2023 Memgraph Ltd.
|
|
#
|
|
# Use of this software is governed by the Business Source License
|
|
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
|
# License, and you may not use this file except in compliance with the Business Source License.
|
|
#
|
|
# As of the Change Date specified in that file, in accordance with
|
|
# the Business Source License, use of this software will be governed
|
|
# by the Apache License, Version 2.0, included in the file
|
|
# licenses/APL.txt.
|
|
|
|
import argparse
|
|
import json
|
|
import multiprocessing
|
|
import pathlib
|
|
import platform
|
|
import random
|
|
import sys
|
|
|
|
import helpers
|
|
import log
|
|
import runners
|
|
import setup
|
|
from benchmark_context import BenchmarkContext
|
|
from workloads import *
|
|
|
|
WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization"
|
|
WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization"
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Main parser.", add_help=False)
|
|
|
|
benchmark_parser = argparse.ArgumentParser(description="Benchmark arguments parser", add_help=False)
|
|
|
|
benchmark_parser.add_argument(
|
|
"benchmarks",
|
|
nargs="*",
|
|
default=None,
|
|
help="descriptions of benchmarks that should be run; "
|
|
"multiple descriptions can be specified to run multiple "
|
|
"benchmarks; the description is specified as "
|
|
"dataset/variant/group/query; Unix shell-style wildcards "
|
|
"can be used in the descriptions; variant, group and query "
|
|
"are optional and they can be left out; the default "
|
|
"variant is '' which selects the default dataset variant; "
|
|
"the default group is '*' which selects all groups; the"
|
|
"default query is '*' which selects all queries",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--num-workers-for-import",
|
|
type=int,
|
|
default=multiprocessing.cpu_count() // 2,
|
|
help="number of workers used to import the dataset",
|
|
)
|
|
benchmark_parser.add_argument(
|
|
"--num-workers-for-benchmark",
|
|
type=int,
|
|
default=1,
|
|
help="number of workers used to execute the benchmark",
|
|
)
|
|
benchmark_parser.add_argument(
|
|
"--single-threaded-runtime-sec",
|
|
type=int,
|
|
default=10,
|
|
help="single threaded duration of each query",
|
|
)
|
|
benchmark_parser.add_argument(
|
|
"--query-count-lower-bound",
|
|
type=int,
|
|
default=30,
|
|
help="Lower bound for query count, minimum number of queries that will be executed. If approximated --single-threaded-runtime-sec query count is lower than this value, lower bound is used.",
|
|
)
|
|
benchmark_parser.add_argument(
|
|
"--no-load-query-counts",
|
|
action="store_true",
|
|
default=False,
|
|
help="disable loading of cached query counts",
|
|
)
|
|
benchmark_parser.add_argument(
|
|
"--no-save-query-counts",
|
|
action="store_true",
|
|
default=False,
|
|
help="disable storing of cached query counts",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--export-results",
|
|
default=None,
|
|
help="file path into which results should be exported",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--no-authorization",
|
|
action="store_false",
|
|
default=True,
|
|
help="Run each query with authorization",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--warm-up",
|
|
default="cold",
|
|
choices=["cold", "hot", "vulcanic"],
|
|
help="Run different warmups before benchmarks sample starts",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--workload-realistic",
|
|
nargs="*",
|
|
type=int,
|
|
default=None,
|
|
help="""Define combination that defines the realistic workload.
|
|
Realistic workload can be run as a single configuration for all groups of queries,
|
|
Pass the positional arguments as values of what percentage of
|
|
write/read/update/analytical queries you want to have in your workload.
|
|
Example: --workload-realistic 1000 20 70 10 0 will execute 1000 queries, 20% write,
|
|
70% read, 10% update and 0% analytical.""",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--workload-mixed",
|
|
nargs="*",
|
|
type=int,
|
|
default=None,
|
|
help="""Mixed workload can be run on each query under some defined load.
|
|
By passing one more positional argument, you are defining what percentage of that query
|
|
will be in mixed workload, and this is executed for each query. The rest of the queries will be
|
|
selected from the appropriate groups
|
|
Running --mixed-workload 1000 30 0 0 0 70, will execute each query 700 times or 70%,
|
|
with the presence of 300 write queries from write type or 30%""",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--time-depended-execution",
|
|
type=int,
|
|
default=0,
|
|
help="Execute defined number of queries (based on single-threaded-runtime-sec) for a defined duration in of wall-clock time",
|
|
)
|
|
|
|
benchmark_parser.add_argument(
|
|
"--performance-tracking",
|
|
action="store_true",
|
|
default=False,
|
|
help="Flag for runners performance tracking, this logs RES through time and vendor specific performance tracking.",
|
|
)
|
|
|
|
benchmark_parser.add_argument("--customer-workloads", default=None, help="Path to customers workloads")
|
|
|
|
benchmark_parser.add_argument(
|
|
"--vendor-specific",
|
|
nargs="*",
|
|
default=[],
|
|
help="Vendor specific arguments that can be applied to each vendor, format: [key=value, key=value ...]",
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(help="Subparsers", dest="run_option")
|
|
|
|
# Vendor native parser starts here
|
|
parser_vendor_native = subparsers.add_parser(
|
|
"vendor-native",
|
|
help="Running database in binary native form",
|
|
parents=[benchmark_parser],
|
|
)
|
|
parser_vendor_native.add_argument(
|
|
"--vendor-name",
|
|
default="memgraph",
|
|
choices=["memgraph", "neo4j"],
|
|
help="Input vendor binary name (memgraph, neo4j)",
|
|
)
|
|
parser_vendor_native.add_argument(
|
|
"--vendor-binary",
|
|
help="Vendor binary used for benchmarking, by default it is memgraph",
|
|
default=helpers.get_binary_path("memgraph"),
|
|
)
|
|
|
|
parser_vendor_native.add_argument(
|
|
"--client-binary",
|
|
default=helpers.get_binary_path("tests/mgbench/client"),
|
|
help="Client binary used for benchmarking",
|
|
)
|
|
|
|
# Vendor docker parsers starts here
|
|
parser_vendor_docker = subparsers.add_parser(
|
|
"vendor-docker", help="Running database in docker", parents=[benchmark_parser]
|
|
)
|
|
parser_vendor_docker.add_argument(
|
|
"--vendor-name",
|
|
default="memgraph",
|
|
choices=["memgraph-docker", "neo4j-docker"],
|
|
help="Input vendor name to run in docker (memgraph-docker, neo4j-docker)",
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def get_queries(gen, count):
|
|
# Make the generator deterministic.
|
|
random.seed(gen.__name__)
|
|
# Generate queries.
|
|
ret = []
|
|
for i in range(count):
|
|
ret.append(gen())
|
|
return ret
|
|
|
|
|
|
def warmup(condition: str, client: runners.BaseRunner, queries: list = None):
|
|
log.init("Started warm-up procedure to match database condition: {} ".format(condition))
|
|
if condition == "hot":
|
|
log.log("Execute warm-up to match condition: {} ".format(condition))
|
|
client.execute(
|
|
queries=[
|
|
("CREATE ();", {}),
|
|
("CREATE ()-[:TempEdge]->();", {}),
|
|
("MATCH (n) RETURN count(n.prop) LIMIT 1;", {}),
|
|
],
|
|
num_workers=1,
|
|
)
|
|
elif condition == "vulcanic":
|
|
log.log("Execute warm-up to match condition: {} ".format(condition))
|
|
client.execute(queries=queries)
|
|
else:
|
|
log.log("No warm-up on condition: {} ".format(condition))
|
|
log.log("Finished warm-up procedure to match database condition: {} ".format(condition))
|
|
|
|
|
|
def mixed_workload(
|
|
vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext
|
|
):
|
|
num_of_queries = benchmark_context.mode_config[0]
|
|
percentage_distribution = benchmark_context.mode_config[1:]
|
|
if sum(percentage_distribution) != 100:
|
|
raise Exception(
|
|
"Please make sure that passed arguments % sum to 100% percent!, passed: ",
|
|
percentage_distribution,
|
|
)
|
|
s = [str(i) for i in benchmark_context.mode_config]
|
|
|
|
config_distribution = "_".join(s)
|
|
|
|
log.log("Generating mixed workload...")
|
|
|
|
percentages_by_type = {
|
|
"write": percentage_distribution[0],
|
|
"read": percentage_distribution[1],
|
|
"update": percentage_distribution[2],
|
|
"analytical": percentage_distribution[3],
|
|
}
|
|
|
|
queries_by_type = {
|
|
"write": [],
|
|
"read": [],
|
|
"update": [],
|
|
"analytical": [],
|
|
}
|
|
|
|
for _, funcname in queries[group]:
|
|
for key in queries_by_type.keys():
|
|
if key in funcname:
|
|
queries_by_type[key].append(funcname)
|
|
|
|
for key, percentage in percentages_by_type.items():
|
|
if percentage != 0 and len(queries_by_type[key]) == 0:
|
|
raise Exception(
|
|
"There is a missing query in group (write, read, update or analytical) for given workload distribution."
|
|
)
|
|
|
|
random.seed(config_distribution)
|
|
|
|
# Executing mixed workload for each test
|
|
if benchmark_context.mode == "Mixed":
|
|
for query, funcname in queries[group]:
|
|
full_workload = []
|
|
|
|
log.info(
|
|
"Running query in mixed workload: {}/{}/{}".format(
|
|
group,
|
|
query,
|
|
funcname,
|
|
),
|
|
)
|
|
base_query = getattr(dataset, funcname)
|
|
|
|
base_query_type = funcname.rsplit("_", 1)[1]
|
|
|
|
if percentages_by_type.get(base_query_type, 0) > 0:
|
|
continue
|
|
|
|
options = ["write", "read", "update", "analytical", "query"]
|
|
function_type = random.choices(population=options, weights=percentage_distribution, k=num_of_queries)
|
|
|
|
for t in function_type:
|
|
# Get the apropropriate functions with same probabilty
|
|
if t == "query":
|
|
full_workload.append(base_query())
|
|
else:
|
|
funcname = random.choices(queries_by_type[t], k=1)[0]
|
|
additional_query = getattr(dataset, funcname)
|
|
full_workload.append(additional_query())
|
|
|
|
vendor.start_db(
|
|
dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution
|
|
)
|
|
warmup(benchmark_context.warm_up, client=client)
|
|
ret = client.execute(
|
|
queries=full_workload,
|
|
num_workers=benchmark_context.num_workers_for_benchmark,
|
|
)[0]
|
|
usage_workload = vendor.stop_db(
|
|
dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution
|
|
)
|
|
|
|
ret["database"] = usage_workload
|
|
|
|
results_key = [
|
|
dataset.NAME,
|
|
dataset.get_variant(),
|
|
group,
|
|
query + "_" + config_distribution,
|
|
WITHOUT_FINE_GRAINED_AUTHORIZATION,
|
|
]
|
|
results.set_value(*results_key, value=ret)
|
|
|
|
else:
|
|
# Executing mixed workload from groups of queries
|
|
full_workload = []
|
|
options = ["write", "read", "update", "analytical"]
|
|
function_type = random.choices(population=options, weights=percentage_distribution, k=num_of_queries)
|
|
|
|
for t in function_type:
|
|
# Get the appropriate functions with same probability
|
|
funcname = random.choices(queries_by_type[t], k=1)[0]
|
|
additional_query = getattr(dataset, funcname)
|
|
full_workload.append(additional_query())
|
|
|
|
vendor.start_db(dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution)
|
|
warmup(benchmark_context.warm_up, client=client)
|
|
ret = client.execute(
|
|
queries=full_workload,
|
|
num_workers=benchmark_context.num_workers_for_benchmark,
|
|
)[0]
|
|
usage_workload = vendor.stop_db(
|
|
dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution
|
|
)
|
|
mixed_workload = {
|
|
"count": ret["count"],
|
|
"duration": ret["duration"],
|
|
"retries": ret["retries"],
|
|
"throughput": ret["throughput"],
|
|
"num_workers": ret["num_workers"],
|
|
"database": usage_workload,
|
|
}
|
|
results_key = [
|
|
dataset.NAME,
|
|
dataset.get_variant(),
|
|
group,
|
|
config_distribution,
|
|
WITHOUT_FINE_GRAINED_AUTHORIZATION,
|
|
]
|
|
results.set_value(*results_key, value=mixed_workload)
|
|
|
|
print(mixed_workload)
|
|
|
|
|
|
def get_query_cache_count(
|
|
vendor: runners.BaseRunner,
|
|
client: runners.BaseClient,
|
|
queries: list,
|
|
config_key: list,
|
|
benchmark_context: BenchmarkContext,
|
|
):
|
|
cached_count = config.get_value(*config_key)
|
|
if cached_count is None:
|
|
log.info(
|
|
"Determining the number of queries necessary for {} seconds of single-threaded runtime...".format(
|
|
benchmark_context.single_threaded_runtime_sec
|
|
)
|
|
)
|
|
log.log("Running query to prime the query cache...")
|
|
# First run to prime the query caches.
|
|
vendor.start_db("cache")
|
|
client.execute(queries=queries, num_workers=1)
|
|
# Get a sense of the runtime.
|
|
count = 1
|
|
while True:
|
|
ret = client.execute(queries=get_queries(func, count), num_workers=1)
|
|
duration = ret[0]["duration"]
|
|
should_execute = int(benchmark_context.single_threaded_runtime_sec / (duration / count))
|
|
log.log(
|
|
"executed_queries={}, total_duration={}, query_duration={}, estimated_count={}".format(
|
|
count, duration, duration / count, should_execute
|
|
)
|
|
)
|
|
# We don't have to execute the next iteration when
|
|
# `should_execute` becomes the same order of magnitude as
|
|
# `count * 10`.
|
|
if should_execute / (count * 10) < 10:
|
|
count = should_execute
|
|
break
|
|
else:
|
|
count = count * 10
|
|
vendor.stop_db("cache")
|
|
|
|
if count < benchmark_context.query_count_lower_bound:
|
|
count = benchmark_context.query_count_lower_bound
|
|
|
|
config.set_value(
|
|
*config_key,
|
|
value={
|
|
"count": count,
|
|
"duration": benchmark_context.single_threaded_runtime_sec,
|
|
},
|
|
)
|
|
else:
|
|
log.log(
|
|
"Using cached query count of {} queries for {} seconds of single-threaded runtime to extrapolate .".format(
|
|
cached_count["count"], cached_count["duration"]
|
|
),
|
|
)
|
|
count = int(cached_count["count"] * benchmark_context.single_threaded_runtime_sec / cached_count["duration"])
|
|
return count
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
vendor_specific_args = helpers.parse_kwargs(args.vendor_specific)
|
|
|
|
assert args.benchmarks != None, helpers.list_available_workloads()
|
|
assert args.num_workers_for_import > 0
|
|
assert args.num_workers_for_benchmark > 0
|
|
assert args.export_results != None, "Pass where will results be saved"
|
|
assert (
|
|
args.single_threaded_runtime_sec >= 10
|
|
), "Low runtime value, consider extending time for more accurate results"
|
|
assert (
|
|
args.workload_realistic == None or args.workload_mixed == None
|
|
), "Cannot run both realistic and mixed workload, only one mode run at the time"
|
|
|
|
temp_dir = pathlib.Path.cwd() / ".temp"
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
benchmark_context = BenchmarkContext(
|
|
benchmark_target_workload=args.benchmarks,
|
|
vendor_binary=args.vendor_binary if args.run_option == "vendor-native" else None,
|
|
vendor_name=args.vendor_name.replace("-", ""),
|
|
client_binary=args.client_binary if args.run_option == "vendor-native" else None,
|
|
num_workers_for_import=args.num_workers_for_import,
|
|
num_workers_for_benchmark=args.num_workers_for_benchmark,
|
|
single_threaded_runtime_sec=args.single_threaded_runtime_sec,
|
|
query_count_lower_bound=args.query_count_lower_bound,
|
|
no_load_query_counts=args.no_load_query_counts,
|
|
export_results=args.export_results,
|
|
temporary_directory=temp_dir.absolute(),
|
|
workload_mixed=args.workload_mixed,
|
|
workload_realistic=args.workload_realistic,
|
|
time_dependent_execution=args.time_depended_execution,
|
|
warm_up=args.warm_up,
|
|
performance_tracking=args.performance_tracking,
|
|
no_authorization=args.no_authorization,
|
|
customer_workloads=args.customer_workloads,
|
|
vendor_args=vendor_specific_args,
|
|
)
|
|
|
|
log.init("Executing benchmark with following arguments: ")
|
|
for key, value in benchmark_context.__dict__.items():
|
|
log.log("{:<30} : {:<30}".format(str(key), str(value)))
|
|
|
|
log.init("Check requirements for running benchmark")
|
|
if setup.check_requirements(benchmark_context=benchmark_context):
|
|
log.success("Requirements satisfied... ")
|
|
else:
|
|
log.warning("Requirements not satisfied...")
|
|
sys.exit(1)
|
|
|
|
log.log("Creating cache folder for: dataset, configurations, indexes, results etc. ")
|
|
cache = helpers.Cache()
|
|
log.init("Folder in use: " + cache.get_default_cache_directory())
|
|
if not benchmark_context.no_load_query_counts:
|
|
log.log("Using previous cached query count data from cache directory.")
|
|
config = cache.load_config()
|
|
else:
|
|
config = helpers.RecursiveDict()
|
|
results = helpers.RecursiveDict()
|
|
|
|
run_config = {
|
|
"vendor": benchmark_context.vendor_name,
|
|
"condition": benchmark_context.warm_up,
|
|
"num_workers_for_benchmark": benchmark_context.num_workers_for_benchmark,
|
|
"single_threaded_runtime_sec": benchmark_context.single_threaded_runtime_sec,
|
|
"benchmark_mode": benchmark_context.mode,
|
|
"benchmark_mode_config": benchmark_context.mode_config,
|
|
"platform": platform.platform(),
|
|
}
|
|
|
|
results.set_value("__run_configuration__", value=run_config)
|
|
|
|
available_workloads = helpers.get_available_workloads(benchmark_context.customer_workloads)
|
|
|
|
# Filter out the workloads based on the pattern
|
|
target_workloads = helpers.filter_workloads(
|
|
available_workloads=available_workloads, benchmark_context=benchmark_context
|
|
)
|
|
|
|
if len(target_workloads) == 0:
|
|
log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload))
|
|
log.error("Please check the pattern and workload NAME property, query group and query name.")
|
|
log.info("Currently available workloads: ")
|
|
log.log(helpers.list_available_workloads(benchmark_context.customer_workloads))
|
|
sys.exit(1)
|
|
|
|
# Run all target workloads.
|
|
for workload, queries in target_workloads:
|
|
log.info("Started running following workload: " + str(workload.NAME))
|
|
|
|
benchmark_context.set_active_workload(workload.NAME)
|
|
benchmark_context.set_active_variant(workload.get_variant())
|
|
|
|
log.init("Creating vendor runner for DB: " + benchmark_context.vendor_name)
|
|
vendor_runner = runners.BaseRunner.create(
|
|
benchmark_context=benchmark_context,
|
|
)
|
|
log.log("Class in use: " + str(vendor_runner.__class__.__name__))
|
|
|
|
log.info("Cleaning the database from any previous data")
|
|
vendor_runner.clean_db()
|
|
|
|
client = vendor_runner.fetch_client()
|
|
log.log("Get appropriate client for vendor " + str(client.__class__.__name__))
|
|
|
|
ret = None
|
|
usage = None
|
|
|
|
generated_queries = workload.dataset_generator()
|
|
if generated_queries:
|
|
print("\n")
|
|
log.info("Using workload as dataset generator...")
|
|
|
|
vendor_runner.start_db_init("import")
|
|
|
|
log.warning("Using following indexes...")
|
|
log.info(workload.indexes_generator())
|
|
log.info("Executing database index setup...")
|
|
ret = client.execute(queries=workload.indexes_generator(), num_workers=1)
|
|
log.log("Finished setting up indexes...")
|
|
for row in ret:
|
|
log.success(
|
|
"Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format(
|
|
row["count"], row["duration"], row["num_workers"], row["throughput"]
|
|
)
|
|
)
|
|
|
|
log.info("Importing dataset...")
|
|
ret = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import)
|
|
log.log("Finished importing dataset...")
|
|
usage = vendor_runner.stop_db_init("import")
|
|
else:
|
|
log.init("Preparing workload: " + workload.NAME + "/" + workload.get_variant())
|
|
workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant()))
|
|
log.info("Using workload dataset information for import...")
|
|
imported = workload.custom_import()
|
|
if not imported:
|
|
log.log("Basic import execution")
|
|
vendor_runner.start_db_init("import")
|
|
log.log("Executing database index setup...")
|
|
client.execute(file_path=workload.get_index(), num_workers=1)
|
|
log.log("Importing dataset...")
|
|
ret = client.execute(
|
|
file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import
|
|
)
|
|
usage = vendor_runner.stop_db_init("import")
|
|
else:
|
|
log.info("Custom import executed...")
|
|
|
|
# Save import results.
|
|
import_key = [workload.NAME, workload.get_variant(), "__import__"]
|
|
if ret != None and usage != None:
|
|
# Display import statistics.
|
|
for row in ret:
|
|
log.success(
|
|
"Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format(
|
|
row["count"], row["duration"], row["num_workers"], row["throughput"]
|
|
)
|
|
)
|
|
|
|
log.success(
|
|
"The database used {} seconds of CPU time and peaked at {} MiB of RAM".format(
|
|
usage["cpu"], usage["memory"] / 1024 / 1024
|
|
)
|
|
)
|
|
|
|
results.set_value(*import_key, value={"client": ret, "database": usage})
|
|
else:
|
|
results.set_value(*import_key, value={"client": "custom_load", "database": "custom_load"})
|
|
|
|
# Run all benchmarks in all available groups.
|
|
for group in sorted(queries.keys()):
|
|
print("\n")
|
|
log.init("Running benchmark in " + benchmark_context.mode)
|
|
if benchmark_context.mode == "Mixed":
|
|
mixed_workload(vendor_runner, client, workload, group, queries, benchmark_context)
|
|
elif benchmark_context.mode == "Realistic":
|
|
mixed_workload(vendor_runner, client, workload, group, queries, benchmark_context)
|
|
else:
|
|
for query, funcname in queries[group]:
|
|
log.init(
|
|
"Running query:"
|
|
+ "{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION),
|
|
)
|
|
func = getattr(workload, funcname)
|
|
|
|
# Query count
|
|
config_key = [
|
|
workload.NAME,
|
|
workload.get_variant(),
|
|
group,
|
|
query,
|
|
]
|
|
log.init("Determining query count for benchmark based on --single-threaded-runtime argument")
|
|
count = get_query_cache_count(
|
|
vendor_runner, client, get_queries(func, 1), config_key, benchmark_context
|
|
)
|
|
# Benchmark run.
|
|
sample_query = get_queries(func, 1)[0][0]
|
|
log.info("Sample query:{}".format(sample_query))
|
|
log.log(
|
|
"Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format(
|
|
count, benchmark_context.single_threaded_runtime_sec
|
|
)
|
|
)
|
|
log.log(
|
|
"Queries are executed using {} concurrent clients".format(
|
|
benchmark_context.num_workers_for_benchmark
|
|
)
|
|
)
|
|
vendor_runner.start_db(
|
|
workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query
|
|
)
|
|
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
|
|
log.init("Executing benchmark queries...")
|
|
if benchmark_context.time_dependent_execution != 0:
|
|
ret = client.execute(
|
|
queries=get_queries(func, count),
|
|
num_workers=benchmark_context.num_workers_for_benchmark,
|
|
time_dependent_execution=benchmark_context.time_depended_execution,
|
|
)[0]
|
|
else:
|
|
ret = client.execute(
|
|
queries=get_queries(func, count),
|
|
num_workers=benchmark_context.num_workers_for_benchmark,
|
|
)[0]
|
|
log.info("Benchmark execution finished...")
|
|
usage = vendor_runner.stop_db(
|
|
workload.NAME + workload.get_variant() + "_" + benchmark_context.mode + "_" + query
|
|
)
|
|
ret["database"] = usage
|
|
# Output summary.
|
|
|
|
log.log("Executed {} queries in {} seconds.".format(ret["count"], ret["duration"]))
|
|
log.log("Queries have been retried {} times".format(ret["retries"]))
|
|
log.log("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
|
|
log.info("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
|
|
if "docker" not in benchmark_context.vendor_name:
|
|
log.log("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max"))
|
|
metadata = ret["metadata"]
|
|
for key in sorted(metadata.keys()):
|
|
log.log(
|
|
"{name:>30}: {minimum:>20.06f} {average:>20.06f} "
|
|
"{maximum:>20.06f}".format(name=key, **metadata[key])
|
|
)
|
|
print("\n")
|
|
log.info("Result:")
|
|
log.info(funcname)
|
|
log.info(sample_query)
|
|
log.success("Latency statistics:")
|
|
for key, value in ret["latency_stats"].items():
|
|
if key == "iterations":
|
|
log.success("{:<10} {:>10}".format(key, value))
|
|
else:
|
|
log.success("{:<10} {:>10.06f} seconds".format(key, value))
|
|
|
|
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
|
|
print("\n\n")
|
|
|
|
# Save results.
|
|
results_key = [
|
|
workload.NAME,
|
|
workload.get_variant(),
|
|
group,
|
|
query,
|
|
WITHOUT_FINE_GRAINED_AUTHORIZATION,
|
|
]
|
|
results.set_value(*results_key, value=ret)
|
|
|
|
# If there is need for authorization testing.
|
|
if benchmark_context.no_authorization:
|
|
log.init("Running queries with authorization...")
|
|
log.info("Setting USER and PRIVILEGES...")
|
|
vendor_runner.start_db("authorization")
|
|
client.execute(
|
|
queries=[
|
|
("CREATE USER user IDENTIFIED BY 'test';", {}),
|
|
("GRANT ALL PRIVILEGES TO user;", {}),
|
|
("GRANT CREATE_DELETE ON EDGE_TYPES * TO user;", {}),
|
|
("GRANT CREATE_DELETE ON LABELS * TO user;", {}),
|
|
]
|
|
)
|
|
|
|
client.set_credentials(username="user", password="test")
|
|
vendor_runner.stop_db("authorization")
|
|
|
|
for query, funcname in queries[group]:
|
|
log.init(
|
|
"Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION)
|
|
)
|
|
func = getattr(workload, funcname)
|
|
|
|
config_key = [
|
|
workload.NAME,
|
|
workload.get_variant(),
|
|
group,
|
|
query,
|
|
]
|
|
count = get_query_cache_count(
|
|
vendor_runner, client, get_queries(func, 1), config_key, benchmark_context
|
|
)
|
|
|
|
vendor_runner.start_db("authorization")
|
|
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
|
|
|
|
ret = client.execute(
|
|
queries=get_queries(func, count),
|
|
num_workers=benchmark_context.num_workers_for_benchmark,
|
|
)[0]
|
|
usage = vendor_runner.stop_db("authorization")
|
|
ret["database"] = usage
|
|
# Output summary.
|
|
log.log("Executed {} queries in {} seconds.".format(ret["count"], ret["duration"]))
|
|
log.log("Queries have been retried {} times".format(ret["retries"]))
|
|
log.log("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
|
|
log.log("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
|
|
log.log("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max"))
|
|
metadata = ret["metadata"]
|
|
for key in sorted(metadata.keys()):
|
|
log.log(
|
|
"{name:>30}: {minimum:>20.06f} {average:>20.06f} "
|
|
"{maximum:>20.06f}".format(name=key, **metadata[key])
|
|
)
|
|
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
|
|
# Save results.
|
|
results_key = [
|
|
workload.NAME,
|
|
workload.get_variant(),
|
|
group,
|
|
query,
|
|
WITH_FINE_GRAINED_AUTHORIZATION,
|
|
]
|
|
results.set_value(*results_key, value=ret)
|
|
|
|
log.info("Deleting USER and PRIVILEGES...")
|
|
vendor_runner.start_db("authorizations")
|
|
ret = client.execute(
|
|
queries=[
|
|
("REVOKE LABELS * FROM user;", {}),
|
|
("REVOKE EDGE_TYPES * FROM user;", {}),
|
|
("DROP USER user;", {}),
|
|
]
|
|
)
|
|
vendor_runner.stop_db("authorization")
|
|
|
|
# Save configuration.
|
|
if not benchmark_context.no_save_query_counts:
|
|
cache.save_config(config)
|
|
|
|
# Export results.
|
|
if benchmark_context.export_results:
|
|
with open(benchmark_context.export_results, "w") as f:
|
|
json.dump(results.get_data(), f)
|
|
|
|
# Results summary.
|
|
log.init("~" * 45)
|
|
log.info("Benchmark finished.")
|
|
log.init("~" * 45)
|
|
log.log("\n")
|
|
log.summary("Benchmark summary")
|
|
log.log("-" * 90)
|
|
log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage"))
|
|
with open(benchmark_context.export_results, "r") as f:
|
|
results = json.load(f)
|
|
for dataset, variants in results.items():
|
|
if dataset == "__run_configuration__":
|
|
continue
|
|
for variant, groups in variants.items():
|
|
for group, queries in groups.items():
|
|
if group == "__import__":
|
|
continue
|
|
for query, auth in queries.items():
|
|
for key, value in auth.items():
|
|
log.log("-" * 90)
|
|
log.summary(
|
|
"{:<20} {:>26.2f} QPS {:>27.2f} MB".format(
|
|
query, value["throughput"], value["database"]["memory"] / 1024.0 / 1024.0
|
|
)
|
|
)
|
|
log.log("-" * 90)
|