diff --git a/tests/mgbench/benchmark.py b/tests/mgbench/benchmark.py index 5ce715571..498f04f44 100755 --- a/tests/mgbench/benchmark.py +++ b/tests/mgbench/benchmark.py @@ -25,6 +25,7 @@ import datasets import log import helpers import runners +import importlib def get_queries(gen, count): @@ -37,8 +38,7 @@ def get_queries(gen, count): return ret -def match_patterns(dataset, variant, group, test, is_default_variant, - patterns): +def match_patterns(dataset, variant, group, test, is_default_variant, patterns): for pattern in patterns: verdict = [fnmatch.fnmatchcase(dataset, pattern[0])] if pattern[1] != "": @@ -58,7 +58,7 @@ def filter_benchmarks(generators, patterns): pattern = patterns[i].split("/") if len(pattern) > 4 or len(pattern) == 0: raise Exception("Invalid benchmark description '" + pattern + "'!") - pattern.extend(["", "*", "*"][len(pattern) - 1:]) + pattern.extend(["", "*", "*"][len(pattern) - 1 :]) patterns[i] = pattern filtered = [] for dataset in sorted(generators.keys()): @@ -68,8 +68,7 @@ def filter_benchmarks(generators, patterns): current = collections.defaultdict(list) for group in tests: for test_name, test_func in tests[group]: - if match_patterns(dataset, variant, group, test_name, - is_default_variant, patterns): + if match_patterns(dataset, variant, group, test_name, is_default_variant, patterns): current[group].append((test_name, test_func)) if len(current) > 0: filtered.append((generator(variant), dict(current))) @@ -78,54 +77,61 @@ def filter_benchmarks(generators, patterns): # Parse options. parser = argparse.ArgumentParser( - description="Memgraph benchmark executor.", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -parser.add_argument("benchmarks", nargs="*", default="", - help="descriptions of benchmarks that should be run; " - "multiple descriptions can be specified to run multiple " - "benchmarks; the description is specified as " - "dataset/variant/group/test; Unix shell-style wildcards " - "can be used in the descriptions; variant, group and test " - "are optional and they can be left out; the default " - "variant is '' which selects the default dataset variant; " - "the default group is '*' which selects all groups; the " - "default test is '*' which selects all tests") -parser.add_argument("--memgraph-binary", - default=helpers.get_binary_path("memgraph"), - help="Memgraph binary used for benchmarking") -parser.add_argument("--client-binary", - default=helpers.get_binary_path("tests/mgbench/client"), - help="client binary used for benchmarking") -parser.add_argument("--num-workers-for-import", type=int, - default=multiprocessing.cpu_count() // 2, - help="number of workers used to import the dataset") -parser.add_argument("--num-workers-for-benchmark", type=int, - default=1, - help="number of workers used to execute the benchmark") -parser.add_argument("--single-threaded-runtime-sec", type=int, - default=10, - help="single threaded duration of each test") -parser.add_argument("--no-load-query-counts", action="store_true", - help="disable loading of cached query counts") -parser.add_argument("--no-save-query-counts", action="store_true", - help="disable storing of cached query counts") -parser.add_argument("--export-results", default="", - help="file path into which results should be exported") -parser.add_argument("--temporary-directory", default="/tmp", - help="directory path where temporary data should " - "be stored") -parser.add_argument("--no-properties-on-edges", action="store_true", - help="disable properties on edges") + description="Memgraph benchmark executor.", formatter_class=argparse.ArgumentDefaultsHelpFormatter +) +parser.add_argument( + "benchmarks", + nargs="*", + default="", + help="descriptions of benchmarks that should be run; " + "multiple descriptions can be specified to run multiple " + "benchmarks; the description is specified as " + "dataset/variant/group/test; Unix shell-style wildcards " + "can be used in the descriptions; variant, group and test " + "are optional and they can be left out; the default " + "variant is '' which selects the default dataset variant; " + "the default group is '*' which selects all groups; the " + "default test is '*' which selects all tests", +) +parser.add_argument( + "--memgraph-binary", default=helpers.get_binary_path("memgraph"), help="Memgraph binary used for benchmarking" +) +parser.add_argument( + "--client-binary", + default=helpers.get_binary_path("tests/mgbench/client"), + help="client binary used for benchmarking", +) +parser.add_argument( + "--num-workers-for-import", + type=int, + default=multiprocessing.cpu_count() // 2, + help="number of workers used to import the dataset", +) +parser.add_argument( + "--num-workers-for-benchmark", type=int, default=1, help="number of workers used to execute the benchmark" +) +parser.add_argument("--single-threaded-runtime-sec", type=int, default=10, help="single threaded duration of each test") +parser.add_argument("--no-load-query-counts", action="store_true", help="disable loading of cached query counts") +parser.add_argument("--no-save-query-counts", action="store_true", help="disable storing of cached query counts") +parser.add_argument("--export-results", default="", help="file path into which results should be exported") +parser.add_argument( + "--temporary-directory", default="/tmp", help="directory path where temporary data should " "be stored" +) +parser.add_argument("--no-properties-on-edges", action="store_true", help="disable properties on edges") +parser.add_argument("--datasets", default="datasets", help="datasets to scan") +parser.add_argument("--datasets-path", default=".", help="path to datasets to scan") args = parser.parse_args() +sys.path.append(args.datasets_path) +dataset_to_use = importlib.import_module(args.datasets) + # Detect available datasets. generators = {} -for key in dir(datasets): +for key in dir(dataset_to_use): if key.startswith("_"): continue - dataset = getattr(datasets, key) - if not inspect.isclass(dataset) or dataset == datasets.Dataset or \ - not issubclass(dataset, datasets.Dataset): + dataset = getattr(dataset_to_use, key) + if not inspect.isclass(dataset) or dataset == datasets.Dataset or not issubclass(dataset, datasets.Dataset): continue tests = collections.defaultdict(list) for funcname in dir(dataset): @@ -135,8 +141,9 @@ for key in dir(datasets): tests[group].append((test, funcname)) generators[dataset.NAME] = (dataset, dict(tests)) if dataset.PROPERTIES_ON_EDGES and args.no_properties_on_edges: - raise Exception("The \"{}\" dataset requires properties on edges, " - "but you have disabled them!".format(dataset.NAME)) + raise Exception( + 'The "{}" dataset requires properties on edges, ' "but you have disabled them!".format(dataset.NAME) + ) # List datasets if there is no specified dataset. if len(args.benchmarks) == 0: @@ -144,8 +151,7 @@ if len(args.benchmarks) == 0: for name in sorted(generators.keys()): print("Dataset:", name) dataset, tests = generators[name] - print(" Variants:", ", ".join(dataset.VARIANTS), - "(default: " + dataset.DEFAULT_VARIANT + ")") + print(" Variants:", ", ".join(dataset.VARIANTS), "(default: " + dataset.DEFAULT_VARIANT + ")") for group in sorted(tests.keys()): print(" Group:", group) for test_name, test_func in tests[group]: @@ -165,31 +171,38 @@ benchmarks = filter_benchmarks(generators, args.benchmarks) # Run all specified benchmarks. for dataset, tests in benchmarks: - log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(), - "dataset") - dataset.prepare(cache.cache_directory("datasets", dataset.NAME, - dataset.get_variant())) + log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(), "dataset") + dataset.prepare(cache.cache_directory("datasets", dataset.NAME, dataset.get_variant())) # Prepare runners and import the dataset. - memgraph = runners.Memgraph(args.memgraph_binary, args.temporary_directory, - not args.no_properties_on_edges) + memgraph = runners.Memgraph(args.memgraph_binary, args.temporary_directory, not args.no_properties_on_edges) client = runners.Client(args.client_binary, args.temporary_directory) memgraph.start_preparation() - ret = client.execute(file_path=dataset.get_file(), - num_workers=args.num_workers_for_import) + ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import) usage = memgraph.stop() # Display import statistics. print() for row in ret: - print("Executed", row["count"], "queries in", row["duration"], - "seconds using", row["num_workers"], - "workers with a total throughput of", row["throughput"], - "queries/second.") + print( + "Executed", + row["count"], + "queries in", + row["duration"], + "seconds using", + row["num_workers"], + "workers with a total throughput of", + row["throughput"], + "queries/second.", + ) print() - print("The database used", usage["cpu"], - "seconds of CPU time and peaked at", - usage["memory"] / 1024 / 1024, "MiB of RAM.") + print( + "The database used", + usage["cpu"], + "seconds of CPU time and peaked at", + usage["memory"] / 1024 / 1024, + "MiB of RAM.", + ) # Save import results. import_key = [dataset.NAME, dataset.get_variant(), "__import__"] @@ -208,24 +221,26 @@ for dataset, tests in benchmarks: config_key = [dataset.NAME, dataset.get_variant(), group, test] cached_count = config.get_value(*config_key) if cached_count is None: - print("Determining the number of queries necessary for", - args.single_threaded_runtime_sec, - "seconds of single-threaded runtime...") + print( + "Determining the number of queries necessary for", + args.single_threaded_runtime_sec, + "seconds of single-threaded runtime...", + ) # First run to prime the query caches. memgraph.start_benchmark() client.execute(queries=get_queries(func, 1), num_workers=1) # Get a sense of the runtime. count = 1 while True: - ret = client.execute(queries=get_queries(func, count), - num_workers=1) + ret = client.execute(queries=get_queries(func, count), num_workers=1) duration = ret[0]["duration"] - should_execute = int(args.single_threaded_runtime_sec / - (duration / count)) - print("executed_queries={}, total_duration={}, " - "query_duration={}, estimated_count={}".format( - count, duration, duration / count, - should_execute)) + should_execute = int(args.single_threaded_runtime_sec / (duration / count)) + print( + "executed_queries={}, total_duration={}, " + "query_duration={}, estimated_count={}".format( + count, duration, duration / count, should_execute + ) + ) # We don't have to execute the next iteration when # `should_execute` becomes the same order of magnitude as # `count * 10`. @@ -235,45 +250,45 @@ for dataset, tests in benchmarks: else: count = count * 10 memgraph.stop() - config.set_value(*config_key, value={ - "count": count, - "duration": args.single_threaded_runtime_sec}) + config.set_value(*config_key, value={"count": count, "duration": args.single_threaded_runtime_sec}) else: - print("Using cached query count of", cached_count["count"], - "queries for", cached_count["duration"], - "seconds of single-threaded runtime.") - count = int(cached_count["count"] * - args.single_threaded_runtime_sec / - cached_count["duration"]) + print( + "Using cached query count of", + cached_count["count"], + "queries for", + cached_count["duration"], + "seconds of single-threaded runtime.", + ) + count = int(cached_count["count"] * args.single_threaded_runtime_sec / cached_count["duration"]) # Benchmark run. print("Sample query:", get_queries(func, 1)[0][0]) - print("Executing benchmark with", count, "queries that should " - "yield a single-threaded runtime of", - args.single_threaded_runtime_sec, "seconds.") - print("Queries are executed using", args.num_workers_for_benchmark, - "concurrent clients.") + print( + "Executing benchmark with", + count, + "queries that should " "yield a single-threaded runtime of", + args.single_threaded_runtime_sec, + "seconds.", + ) + print("Queries are executed using", args.num_workers_for_benchmark, "concurrent clients.") memgraph.start_benchmark() - ret = client.execute(queries=get_queries(func, count), - num_workers=args.num_workers_for_benchmark)[0] + ret = client.execute(queries=get_queries(func, count), num_workers=args.num_workers_for_benchmark)[0] usage = memgraph.stop() ret["database"] = usage # Output summary. print() - print("Executed", ret["count"], "queries in", - ret["duration"], "seconds.") + print("Executed", ret["count"], "queries in", ret["duration"], "seconds.") print("Queries have been retried", ret["retries"], "times.") - print("Database used {:.3f} seconds of CPU time.".format( - usage["cpu"])) - print("Database peaked at {:.3f} MiB of memory.".format( - usage["memory"] / 1024.0 / 1024.0)) - print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", - "avg", "max")) + print("Database used {:.3f} seconds of CPU time.".format(usage["cpu"])) + print("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0)) + print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max")) metadata = ret["metadata"] for key in sorted(metadata.keys()): - print("{name:>30}: {minimum:>20.06f} {average:>20.06f} " - "{maximum:>20.06f}".format(name=key, **metadata[key])) + print( + "{name:>30}: {minimum:>20.06f} {average:>20.06f} " + "{maximum:>20.06f}".format(name=key, **metadata[key]) + ) log.success("Throughput: {:02f} QPS".format(ret["throughput"])) # Save results.