From efdf7baea0e6ead04b2f74f44ed5e33beeb97b7a Mon Sep 17 00:00:00 2001 From: Andi Date: Fri, 22 Sep 2023 19:05:16 +0200 Subject: [PATCH] Refactor mgbench --- tests/mgbench/benchmark.py | 950 ++++++++++++++++------------- tests/mgbench/benchmark_context.py | 23 +- tests/mgbench/graph_bench.py | 2 + tests/mgbench/helpers.py | 1 + tests/mgbench/workload_mode.py | 14 + 5 files changed, 551 insertions(+), 439 deletions(-) create mode 100644 tests/mgbench/workload_mode.py diff --git a/tests/mgbench/benchmark.py b/tests/mgbench/benchmark.py index fcec5e811..12af6ead7 100755 --- a/tests/mgbench/benchmark.py +++ b/tests/mgbench/benchmark.py @@ -18,21 +18,92 @@ import pathlib import platform import random import sys +import time +from typing import Dict, List import helpers import log import runners import setup from benchmark_context import BenchmarkContext +from workload_mode import BENCHMARK_MODE_MIXED, BENCHMARK_MODE_REALISTIC from workloads import * WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization" WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization" +RUN_CONFIGURATION = "__run_configuration__" +IMPORT = "__import__" +THROUGHPUT = "throughput" +DATABASE = "database" +MEMORY = "memory" +VENDOR = "vendor" +CONDITION = "condition" +NUM_WORKERS_FOR_BENCHMARK = "num_workers_for_benchmark" +SINGLE_THREADED_RUNTIME_SEC = "single_threaded_runtime_sec" +BENCHMARK_MODE = "benchmark_mode" +BENCHMARK_MODE_CONFIG = "benchmark_mode_config" +PLATFORM = "platform" +COUNT = "count" +DURATION = "duration" +NUM_WORKERS = "num_workers" +CPU = "cpu" +MEMORY = "memory" +VENDOR_RUNNER_IMPORT = "import" +VENDOR_RUNNER_AUTHORIZATION = "authorization" +CLIENT = "client" +DATABASE = "database" +CUSTOM_LOAD = "custom_load" +RETRIES = "retries" +DOCKER = "docker" +METADATA = "metadata" +LATENCY_STATS = "latency_stats" +ITERATIONS = "iterations" +USERNAME = "user" +PASSWORD = "test" +DATABASE_CONDITION_HOT = "hot" +DATABASE_CONDITION_VULCANIC = "vulcanic" +WRITE_TYPE_QUERY = "write" +READ_TYPE_QUERY = "read" +UPDATE_TYPE_QUERY = "update" +ANALYTICAL_TYPE_QUERY = "analytical" +QUERY = "query" +CACHE = "cache" +DISK_PREPARATION_RSS = "disk_storage_preparation" +IN_MEMORY_ANALYTICAL_RSS = "in_memory_analytical_preparation" + +IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL" + + +WARMUP_TO_HOT_QUERIES = [ + ("CREATE ();", {}), + ("CREATE ()-[:TempEdge]->();", {}), + ("MATCH (n) RETURN count(n.prop) LIMIT 1;", {}), +] + +SETUP_AUTH_QUERIES = [ + ("CREATE USER user IDENTIFIED BY 'test';", {}), + ("GRANT ALL PRIVILEGES TO user;", {}), + ("GRANT CREATE_DELETE ON EDGE_TYPES * TO user;", {}), + ("GRANT CREATE_DELETE ON LABELS * TO user;", {}), +] + +CLEANUP_AUTH_QUERIES = [ + ("REVOKE LABELS * FROM user;", {}), + ("REVOKE EDGE_TYPES * FROM user;", {}), + ("DROP USER user;", {}), +] + +SETUP_DISK_STORAGE = [ + ("STORAGE MODE ON_DISK_TRANSACTIONAL;", {}), +] + +SETUP_IN_MEMORY_ANALYTICAL_STORAGE_MODE = [ + ("STORAGE MODE IN_MEMORY_ANALYTICAL;", {}), +] def parse_args(): parser = argparse.ArgumentParser(description="Main parser.", add_help=False) - benchmark_parser = argparse.ArgumentParser(description="Benchmark arguments parser", add_help=False) benchmark_parser.add_argument( @@ -149,6 +220,20 @@ def parse_args(): benchmark_parser.add_argument("--customer-workloads", default=None, help="Path to customers workloads") + benchmark_parser.add_argument( + "--disk-storage", + action="store_true", + default=False, + help="If the flag set, benchmarks will be run also for disk storage.", + ) + + benchmark_parser.add_argument( + "--in-memory-analytical", + action="store_true", + default=False, + help="If the flag set, benchmarks will be run also for in_memory_analytical.", + ) + benchmark_parser.add_argument( "--vendor-specific", nargs="*", @@ -158,7 +243,6 @@ def parse_args(): subparsers = parser.add_subparsers(help="Subparsers", dest="run_option") - # Vendor native parser starts here parser_vendor_native = subparsers.add_parser( "vendor-native", help="Running database in binary native form", @@ -182,7 +266,6 @@ def parse_args(): help="Client binary used for benchmarking", ) - # Vendor docker parsers starts here parser_vendor_docker = subparsers.add_parser( "vendor-docker", help="Running database in docker", parents=[benchmark_parser] ) @@ -197,28 +280,21 @@ def parse_args(): def get_queries(gen, count): - # Make the generator deterministic. random.seed(gen.__name__) - # Generate queries. ret = [] - for i in range(count): + for _ in range(count): ret.append(gen()) return ret def warmup(condition: str, client: runners.BaseRunner, queries: list = None): - log.init("Started warm-up procedure to match database condition: {} ".format(condition)) - if condition == "hot": + if condition == DATABASE_CONDITION_HOT: log.log("Execute warm-up to match condition: {} ".format(condition)) client.execute( - queries=[ - ("CREATE ();", {}), - ("CREATE ()-[:TempEdge]->();", {}), - ("MATCH (n) RETURN count(n.prop) LIMIT 1;", {}), - ], + queries=WARMUP_TO_HOT_QUERIES, num_workers=1, ) - elif condition == "vulcanic": + elif condition == DATABASE_CONDITION_VULCANIC: log.log("Execute warm-up to match condition: {} ".format(condition)) client.execute(queries=queries) else: @@ -226,9 +302,22 @@ def warmup(condition: str, client: runners.BaseRunner, queries: list = None): log.log("Finished warm-up procedure to match database condition: {} ".format(condition)) -def mixed_workload( - vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext -): +def validate_workload_distribution(percentage_distribution, queries_by_type): + percentages_by_type = { + WRITE_TYPE_QUERY: percentage_distribution[0], + READ_TYPE_QUERY: percentage_distribution[1], + UPDATE_TYPE_QUERY: percentage_distribution[2], + ANALYTICAL_TYPE_QUERY: percentage_distribution[3], + } + + for key, percentage in percentages_by_type.items(): + if percentage != 0 and len(queries_by_type[key]) == 0: + raise Exception( + "There is a missing query in group (write, read, update or analytical) for given workload distribution." + ) + + +def prepare_for_workload(benchmark_context, dataset, group, queries): num_of_queries = benchmark_context.mode_config[0] percentage_distribution = benchmark_context.mode_config[1:] if sum(percentage_distribution) != 100: @@ -237,10 +326,19 @@ def mixed_workload( percentage_distribution, ) s = [str(i) for i in benchmark_context.mode_config] - config_distribution = "_".join(s) - log.log("Generating mixed workload...") + queries_by_type = { + WRITE_TYPE_QUERY: [], + READ_TYPE_QUERY: [], + UPDATE_TYPE_QUERY: [], + ANALYTICAL_TYPE_QUERY: [], + } + + for _, funcname in queries[group]: + for key in queries_by_type.keys(): + if key in funcname: + queries_by_type[key].append(funcname) percentages_by_type = { "write": percentage_distribution[0], @@ -249,128 +347,138 @@ def mixed_workload( "analytical": percentage_distribution[3], } - queries_by_type = { - "write": [], - "read": [], - "update": [], - "analytical": [], - } - - for _, funcname in queries[group]: - for key in queries_by_type.keys(): - if key in funcname: - queries_by_type[key].append(funcname) - for key, percentage in percentages_by_type.items(): if percentage != 0 and len(queries_by_type[key]) == 0: raise Exception( "There is a missing query in group (write, read, update or analytical) for given workload distribution." ) + validate_workload_distribution(percentage_distribution, queries_by_type) random.seed(config_distribution) - # Executing mixed workload for each test - if benchmark_context.mode == "Mixed": - for query, funcname in queries[group]: - full_workload = [] + return config_distribution, queries_by_type, percentages_by_type, percentage_distribution, num_of_queries - log.info( - "Running query in mixed workload: {}/{}/{}".format( - group, - query, - funcname, - ), - ) - base_query = getattr(dataset, funcname) - base_query_type = funcname.rsplit("_", 1)[1] +def realistic_workload( + vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext +): + log.log("Executing realistic workload...") + config_distribution, queries_by_type, _, percentage_distribution, num_of_queries = prepare_for_workload( + benchmark_context, dataset, group, queries + ) - if percentages_by_type.get(base_query_type, 0) > 0: - continue + options = [WRITE_TYPE_QUERY, READ_TYPE_QUERY, UPDATE_TYPE_QUERY, ANALYTICAL_TYPE_QUERY] + function_type = random.choices(population=options, weights=percentage_distribution, k=num_of_queries) - options = ["write", "read", "update", "analytical", "query"] - function_type = random.choices(population=options, weights=percentage_distribution, k=num_of_queries) + prepared_queries = [] + for t in function_type: + # Get the appropriate functions with same probability + funcname = random.choices(queries_by_type[t], k=1)[0] + additional_query = getattr(dataset, funcname) + prepared_queries.append(additional_query()) - for t in function_type: - # Get the appropriate functions with same probabilty - if t == "query": - full_workload.append(base_query()) - else: - funcname = random.choices(queries_by_type[t], k=1)[0] - additional_query = getattr(dataset, funcname) - full_workload.append(additional_query()) + rss_db = dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution + vendor.start_db(rss_db) + warmup(benchmark_context.warm_up, client=client) - vendor.start_db( - dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution - ) - warmup(benchmark_context.warm_up, client=client) - ret = client.execute( - queries=full_workload, - num_workers=benchmark_context.num_workers_for_benchmark, - )[0] - usage_workload = vendor.stop_db( - dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution - ) + ret = client.execute( + queries=prepared_queries, + num_workers=benchmark_context.num_workers_for_benchmark, + )[0] - ret["database"] = usage_workload + usage_workload = vendor.stop_db(rss_db) - results_key = [ - dataset.NAME, - dataset.get_variant(), + realistic_workload_res = { + COUNT: ret[COUNT], + DURATION: ret[DURATION], + RETRIES: ret[RETRIES], + THROUGHPUT: ret[THROUGHPUT], + NUM_WORKERS: ret[NUM_WORKERS], + DATABASE: usage_workload, + } + results_key = [ + dataset.NAME, + dataset.get_variant(), + group, + config_distribution, + WITHOUT_FINE_GRAINED_AUTHORIZATION, + ] + results.set_value(*results_key, value=realistic_workload_res) + + +def mixed_workload( + vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext +): + log.log("Executing mixed workload...") + ( + config_distribution, + queries_by_type, + percentages_by_type, + percentage_distribution, + num_of_queries, + ) = prepare_for_workload(benchmark_context, dataset, group, queries) + + options = [WRITE_TYPE_QUERY, READ_TYPE_QUERY, UPDATE_TYPE_QUERY, ANALYTICAL_TYPE_QUERY, QUERY] + + for query, funcname in queries[group]: + log.info( + "Running query in mixed workload: {}/{}/{}".format( group, - query + "_" + config_distribution, - WITHOUT_FINE_GRAINED_AUTHORIZATION, - ] - results.set_value(*results_key, value=ret) + query, + funcname, + ), + ) + base_query_type = funcname.rsplit("_", 1)[1] + if percentages_by_type.get(base_query_type, 0) > 0: + continue - else: - # Executing mixed workload from groups of queries - full_workload = [] - options = ["write", "read", "update", "analytical"] function_type = random.choices(population=options, weights=percentage_distribution, k=num_of_queries) + prepared_queries = [] + base_query = getattr(dataset, funcname) for t in function_type: - # Get the appropriate functions with same probability - funcname = random.choices(queries_by_type[t], k=1)[0] - additional_query = getattr(dataset, funcname) - full_workload.append(additional_query()) + if t == QUERY: + prepared_queries.append(base_query()) + else: + funcname = random.choices(queries_by_type[t], k=1)[0] + additional_query = getattr(dataset, funcname) + prepared_queries.append(additional_query()) - vendor.start_db(dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution) + rss_db = dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution + vendor.start_db(rss_db) warmup(benchmark_context.warm_up, client=client) + ret = client.execute( - queries=full_workload, + queries=prepared_queries, num_workers=benchmark_context.num_workers_for_benchmark, )[0] - usage_workload = vendor.stop_db( - dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution - ) - mixed_workload = { - "count": ret["count"], - "duration": ret["duration"], - "retries": ret["retries"], - "throughput": ret["throughput"], - "num_workers": ret["num_workers"], - "database": usage_workload, - } + + usage_workload = vendor.stop_db(rss_db) + + ret[DATABASE] = usage_workload + results_key = [ dataset.NAME, dataset.get_variant(), group, - config_distribution, + query + "_" + config_distribution, WITHOUT_FINE_GRAINED_AUTHORIZATION, ] - results.set_value(*results_key, value=mixed_workload) - - print(mixed_workload) + results.set_value(*results_key, value=ret) def get_query_cache_count( vendor: runners.BaseRunner, client: runners.BaseClient, - queries: list, - config_key: list, + queries: List, benchmark_context: BenchmarkContext, + workload: str, + group: str, + query: str, + func: str, ): + log.init("Determining query count for benchmark based on --single-threaded-runtime argument") + config_key = [workload.NAME, workload.get_variant(), group, query] cached_count = config.get_value(*config_key) if cached_count is None: log.info( @@ -379,14 +487,12 @@ def get_query_cache_count( ) ) log.log("Running query to prime the query cache...") - # First run to prime the query caches. - vendor.start_db("cache") + vendor.start_db(CACHE) client.execute(queries=queries, num_workers=1) - # Get a sense of the runtime. count = 1 while True: ret = client.execute(queries=get_queries(func, count), num_workers=1) - duration = ret[0]["duration"] + duration = ret[0][DURATION] should_execute = int(benchmark_context.single_threaded_runtime_sec / (duration / count)) log.log( "executed_queries={}, total_duration={}, query_duration={}, estimated_count={}".format( @@ -401,7 +507,7 @@ def get_query_cache_count( break else: count = count * 10 - vendor.stop_db("cache") + vendor.stop_db(CACHE) if count < benchmark_context.query_count_lower_bound: count = benchmark_context.query_count_lower_bound @@ -409,24 +515,69 @@ def get_query_cache_count( config.set_value( *config_key, value={ - "count": count, - "duration": benchmark_context.single_threaded_runtime_sec, + COUNT: count, + DURATION: benchmark_context.single_threaded_runtime_sec, }, ) else: log.log( "Using cached query count of {} queries for {} seconds of single-threaded runtime to extrapolate .".format( - cached_count["count"], cached_count["duration"] + cached_count[COUNT], cached_count[DURATION] ), ) - count = int(cached_count["count"] * benchmark_context.single_threaded_runtime_sec / cached_count["duration"]) + count = int(cached_count[COUNT] * benchmark_context.single_threaded_runtime_sec / cached_count[DURATION]) return count -if __name__ == "__main__": - args = parse_args() - vendor_specific_args = helpers.parse_kwargs(args.vendor_specific) +def log_benchmark_summary(results: Dict): + log.init("~" * 45) + log.info("Benchmark finished.") + log.init("~" * 45) + log.log("\n") + log.summary("Benchmark summary") + log.log("-" * 90) + log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage")) + for dataset, variants in results.items(): + if dataset == RUN_CONFIGURATION: + continue + for groups in variants.values(): + for group, queries in groups.items(): + if group == IMPORT: + continue + for query, auth in queries.items(): + for value in auth.values(): + log.log("-" * 90) + log.summary( + "{:<20} {:>26.2f} QPS {:>27.2f} MB".format( + query, value[THROUGHPUT], value[DATABASE][MEMORY] / (1024.0 * 1024.0) + ) + ) + log.log("-" * 90) + +def log_benchmark_arguments(benchmark_context): + log.init("Executing benchmark with following arguments: ") + for key, value in benchmark_context.__dict__.items(): + log.log("{:<30} : {:<30}".format(str(key), str(value))) + + +def check_benchmark_requirements(benchmark_context): + if setup.check_requirements(benchmark_context): + log.success("Requirements for starting benchmark satisfied!") + else: + log.warning("Requirements for starting benchmark not satisfied!") + sys.exit(1) + + +def setup_cache_config(benchmark_context, cache): + if not benchmark_context.no_load_query_counts: + log.log("Using previous cached query count data from cache directory.") + return cache.load_config() + else: + return helpers.RecursiveDict() + + +def sanitize_args(args): assert args.benchmarks != None, helpers.list_available_workloads() assert args.num_workers_for_import > 0 assert args.num_workers_for_benchmark > 0 @@ -438,6 +589,236 @@ if __name__ == "__main__": args.workload_realistic == None or args.workload_mixed == None ), "Cannot run both realistic and mixed workload, only one mode run at the time" + +def save_import_results(workload, results, import_results, rss_usage): + log.info("Summarized importing benchmark results:") + import_key = [workload.NAME, workload.get_variant(), IMPORT] + if import_results != None and rss_usage != None: + # Display import statistics. + for row in import_results: + log.success( + "Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format( + row[COUNT], row[DURATION], row[NUM_WORKERS], row[THROUGHPUT] + ) + ) + + log.success( + "The database used {} seconds of CPU time and peaked at {} MiB of RAM".format( + rss_usage[CPU], rss_usage[MEMORY] / (1024 * 1024) + ) + ) + results.set_value(*import_key, value={CLIENT: import_results, DATABASE: rss_usage}) + else: + results.set_value(*import_key, value={CLIENT: CUSTOM_LOAD, DATABASE: CUSTOM_LOAD}) + + +def log_metrics_summary(ret, usage): + log.log("Executed {} queries in {} seconds.".format(ret[COUNT], ret[DURATION])) + log.log("Queries have been retried {} times".format(ret[RETRIES])) + log.log("Database used {:.3f} seconds of CPU time.".format(usage[CPU])) + log.info("Database peaked at {:.3f} MiB of memory.".format(usage[MEMORY] / (1024.0 * 1024.0))) + + +def log_metadata_summary(ret): + log.log("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max")) + metadata = ret[METADATA] + for key in sorted(metadata.keys()): + log.log( + "{name:>30}: {minimum:>20.06f} {average:>20.06f} " "{maximum:>20.06f}".format(name=key, **metadata[key]) + ) + + +def log_output_summary(benchmark_context, ret, usage, funcname, sample_query): + log_metrics_summary(ret, usage) + if DOCKER not in benchmark_context.vendor_name: + log_metadata_summary(ret) + + log.info("\nResult:") + log.info(funcname) + log.info(sample_query) + log.success("Latency statistics:") + for key, value in ret[LATENCY_STATS].items(): + if key == ITERATIONS: + log.success("{:<10} {:>10}".format(key, value)) + else: + log.success("{:<10} {:>10.06f} seconds".format(key, value)) + log.success("Throughput: {:02f} QPS\n\n".format(ret[THROUGHPUT])) + + +def save_to_results(results, ret, workload, group, query, authorization_mode): + results_key = [ + workload.NAME, + workload.get_variant(), + group, + query, + authorization_mode, + ] + results.set_value(*results_key, value=ret) + + +def run_isolated_workload_with_authorization(vendor_runner, client, queries, group, workload): + log.init("Running isolated workload with authorization") + + log.info("Running preprocess AUTH queries") + vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) + client.execute(queries=SETUP_AUTH_QUERIES) + client.set_credentials(username=USERNAME, password=PASSWORD) + vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) + + for query, funcname in queries[group]: + log.init("Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION)) + func = getattr(workload, funcname) + count = get_query_cache_count( + vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func + ) + + vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) + warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) + + ret = client.execute( + queries=get_queries(func, count), + num_workers=benchmark_context.num_workers_for_benchmark, + )[0] + usage = vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) + + ret[DATABASE] = usage + log_metrics_summary(ret, usage) + log_metadata_summary(ret) + log.success("Throughput: {:02f} QPS".format(ret[THROUGHPUT])) + save_to_results(results, ret, workload, group, query, WITH_FINE_GRAINED_AUTHORIZATION) + + vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) + log.info("Running cleanup of auth queries") + ret = client.execute(queries=CLEANUP_AUTH_QUERIES) + vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) + + +def run_isolated_workload_without_authorization(vendor_runner, client, queries, group, workload): + log.init("Running isolated workload without authorization") + for query, funcname in queries[group]: + log.init( + "Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION), + ) + func = getattr(workload, funcname) + count = get_query_cache_count( + vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func + ) + + # Benchmark run. + sample_query = get_queries(func, 1)[0][0] + log.info("Sample query:{}".format(sample_query)) + log.log( + "Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format( + count, benchmark_context.single_threaded_runtime_sec + ) + ) + log.log("Queries are executed using {} concurrent clients".format(benchmark_context.num_workers_for_benchmark)) + + rss_db = workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query + vendor_runner.start_db(rss_db) + warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) + log.init("Executing benchmark queries...") + ret = client.execute( + queries=get_queries(func, count), + num_workers=benchmark_context.num_workers_for_benchmark, + time_dependent_execution=benchmark_context.time_dependent_execution, + )[0] + + log.info("Benchmark execution finished...") + usage = vendor_runner.stop_db(rss_db) + + ret[DATABASE] = usage + log_output_summary(benchmark_context, ret, usage, funcname, sample_query) + + save_to_results(results, ret, workload, group, query, WITHOUT_FINE_GRAINED_AUTHORIZATION) + + +def setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload): + vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT) + log.info("Executing database index setup") + + if generated_queries: + client.execute(queries=workload.indexes_generator(), num_workers=1) + log.info("Finished setting up indexes.") + log.info("Started importing dataset") + import_results = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import) + else: + log.info("Using workload information for importing dataset and creating indices") + log.info("Preparing workload: " + workload.NAME + "/" + workload.get_variant()) + workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant())) + imported = workload.custom_import() + if not imported: + client.execute(file_path=workload.get_index(), num_workers=1) + log.info("Finished setting up indexes.") + log.info("Started importing dataset") + import_results = client.execute( + file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import + ) + else: + log.info("Custom import executed") + + log.info("Finished importing dataset") + rss_usage = vendor_runner.stop_db_init(VENDOR_RUNNER_IMPORT) + + return import_results, rss_usage + + +def run_target_workload(benchmark_context, workload, bench_queries, vendor_runner, client): + generated_queries = workload.dataset_generator() + import_results, rss_usage = setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload) + save_import_results(workload, results, import_results, rss_usage) + + for group in sorted(bench_queries.keys()): + log.init(f"\nRunning benchmark in {benchmark_context.mode} workload mode for {group} group") + if benchmark_context.mode == BENCHMARK_MODE_MIXED: + mixed_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context) + elif benchmark_context.mode == BENCHMARK_MODE_REALISTIC: + realistic_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context) + else: + run_isolated_workload_without_authorization(vendor_runner, client, bench_queries, group, workload) + + if benchmark_context.no_authorization: + run_isolated_workload_with_authorization(vendor_runner, client, bench_queries, group, workload) + + +# TODO: (andi) Reorder functions in top-down notion in order to improve readibility +def run_target_workloads(benchmark_context, target_workloads): + for workload, bench_queries in target_workloads: + log.info(f"Started running {str(workload.NAME)} workload") + + benchmark_context.set_active_workload(workload.NAME) + benchmark_context.set_active_variant(workload.get_variant()) + + log.info(f"Running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.") + in_memory_txn_vendor_runner, in_memory_txn_client = client_runner_factory(benchmark_context) + run_target_workload( + benchmark_context, workload, bench_queries, in_memory_txn_vendor_runner, in_memory_txn_client + ) + log.info(f"Finished running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.") + + +def client_runner_factory(benchmark_context): + vendor_runner = runners.BaseRunner.create(benchmark_context=benchmark_context) + vendor_runner.clean_db() + log.log("Database cleaned from any previous data") + client = vendor_runner.fetch_client() + return vendor_runner, client + + +def validate_target_workloads(benchmark_context, target_workloads): + if len(target_workloads) == 0: + log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload)) + log.error("Please check the pattern and workload NAME property, query group and query name.") + log.info("Currently available workloads: ") + log.log(helpers.list_available_workloads(benchmark_context.customer_workloads)) + sys.exit(1) + + +if __name__ == "__main__": + args = parse_args() + sanitize_args(args) + vendor_specific_args = helpers.parse_kwargs(args.vendor_specific) + temp_dir = pathlib.Path.cwd() / ".temp" temp_dir.mkdir(parents=True, exist_ok=True) @@ -461,346 +842,47 @@ if __name__ == "__main__": no_authorization=args.no_authorization, customer_workloads=args.customer_workloads, vendor_args=vendor_specific_args, + disk_storage=args.disk_storage, + in_memory_analytical=args.in_memory_analytical, ) - log.init("Executing benchmark with following arguments: ") - for key, value in benchmark_context.__dict__.items(): - log.log("{:<30} : {:<30}".format(str(key), str(value))) + log_benchmark_arguments(benchmark_context) + check_benchmark_requirements(benchmark_context) - log.init("Check requirements for running benchmark") - if setup.check_requirements(benchmark_context=benchmark_context): - log.success("Requirements satisfied... ") - else: - log.warning("Requirements not satisfied...") - sys.exit(1) - - log.log("Creating cache folder for: dataset, configurations, indexes, results etc. ") cache = helpers.Cache() - log.init("Folder in use: " + cache.get_default_cache_directory()) - if not benchmark_context.no_load_query_counts: - log.log("Using previous cached query count data from cache directory.") - config = cache.load_config() - else: - config = helpers.RecursiveDict() - results = helpers.RecursiveDict() + log.log("Creating cache folder for dataset, configurations, indexes and results.") + log.log("Cache folder in use: " + cache.get_default_cache_directory()) + config = setup_cache_config(benchmark_context, cache) run_config = { - "vendor": benchmark_context.vendor_name, - "condition": benchmark_context.warm_up, - "num_workers_for_benchmark": benchmark_context.num_workers_for_benchmark, - "single_threaded_runtime_sec": benchmark_context.single_threaded_runtime_sec, - "benchmark_mode": benchmark_context.mode, - "benchmark_mode_config": benchmark_context.mode_config, - "platform": platform.platform(), + VENDOR: benchmark_context.vendor_name, + CONDITION: benchmark_context.warm_up, + NUM_WORKERS_FOR_BENCHMARK: benchmark_context.num_workers_for_benchmark, + SINGLE_THREADED_RUNTIME_SEC: benchmark_context.single_threaded_runtime_sec, + BENCHMARK_MODE: benchmark_context.mode, + BENCHMARK_MODE_CONFIG: benchmark_context.mode_config, + PLATFORM: platform.platform(), } - results.set_value("__run_configuration__", value=run_config) + results = helpers.RecursiveDict() + results.set_value(RUN_CONFIGURATION, value=run_config) available_workloads = helpers.get_available_workloads(benchmark_context.customer_workloads) # Filter out the workloads based on the pattern + # TODO (andi) Maybe here filter workloads if working on disk storage. target_workloads = helpers.filter_workloads( available_workloads=available_workloads, benchmark_context=benchmark_context ) - if len(target_workloads) == 0: - log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload)) - log.error("Please check the pattern and workload NAME property, query group and query name.") - log.info("Currently available workloads: ") - log.log(helpers.list_available_workloads(benchmark_context.customer_workloads)) - sys.exit(1) + validate_target_workloads(benchmark_context, target_workloads) + run_target_workloads(benchmark_context, target_workloads) - # Run all target workloads. - for workload, queries in target_workloads: - log.info("Started running following workload: " + str(workload.NAME)) - - benchmark_context.set_active_workload(workload.NAME) - benchmark_context.set_active_variant(workload.get_variant()) - - log.init("Creating vendor runner for DB: " + benchmark_context.vendor_name) - vendor_runner = runners.BaseRunner.create( - benchmark_context=benchmark_context, - ) - log.log("Class in use: " + str(vendor_runner.__class__.__name__)) - - log.info("Cleaning the database from any previous data") - vendor_runner.clean_db() - - client = vendor_runner.fetch_client() - log.log("Get appropriate client for vendor " + str(client.__class__.__name__)) - - ret = None - usage = None - - generated_queries = workload.dataset_generator() - if generated_queries: - print("\n") - log.info("Using workload as dataset generator...") - - vendor_runner.start_db_init("import") - - log.warning("Using following indexes...") - log.info(workload.indexes_generator()) - log.info("Executing database index setup...") - ret = client.execute(queries=workload.indexes_generator(), num_workers=1) - log.log("Finished setting up indexes...") - for row in ret: - log.success( - "Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format( - row["count"], row["duration"], row["num_workers"], row["throughput"] - ) - ) - - log.info("Importing dataset...") - ret = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import) - log.log("Finished importing dataset...") - usage = vendor_runner.stop_db_init("import") - else: - log.init("Preparing workload: " + workload.NAME + "/" + workload.get_variant()) - workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant())) - log.info("Using workload dataset information for import...") - imported = workload.custom_import() - if not imported: - log.log("Basic import execution") - vendor_runner.start_db_init("import") - log.log("Executing database index setup...") - client.execute(file_path=workload.get_index(), num_workers=1) - log.log("Importing dataset...") - ret = client.execute( - file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import - ) - usage = vendor_runner.stop_db_init("import") - else: - log.info("Custom import executed...") - - # Save import results. - import_key = [workload.NAME, workload.get_variant(), "__import__"] - if ret != None and usage != None: - # Display import statistics. - for row in ret: - log.success( - "Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format( - row["count"], row["duration"], row["num_workers"], row["throughput"] - ) - ) - - log.success( - "The database used {} seconds of CPU time and peaked at {} MiB of RAM".format( - usage["cpu"], usage["memory"] / 1024 / 1024 - ) - ) - - results.set_value(*import_key, value={"client": ret, "database": usage}) - else: - results.set_value(*import_key, value={"client": "custom_load", "database": "custom_load"}) - - # Run all benchmarks in all available groups. - for group in sorted(queries.keys()): - print("\n") - log.init("Running benchmark in " + benchmark_context.mode) - if benchmark_context.mode == "Mixed": - mixed_workload(vendor_runner, client, workload, group, queries, benchmark_context) - elif benchmark_context.mode == "Realistic": - mixed_workload(vendor_runner, client, workload, group, queries, benchmark_context) - else: - for query, funcname in queries[group]: - log.init( - "Running query:" - + "{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION), - ) - func = getattr(workload, funcname) - - # Query count - config_key = [ - workload.NAME, - workload.get_variant(), - group, - query, - ] - log.init("Determining query count for benchmark based on --single-threaded-runtime argument") - count = get_query_cache_count( - vendor_runner, client, get_queries(func, 1), config_key, benchmark_context - ) - # Benchmark run. - sample_query = get_queries(func, 1)[0][0] - log.info("Sample query:{}".format(sample_query)) - log.log( - "Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format( - count, benchmark_context.single_threaded_runtime_sec - ) - ) - log.log( - "Queries are executed using {} concurrent clients".format( - benchmark_context.num_workers_for_benchmark - ) - ) - vendor_runner.start_db( - workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query - ) - warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) - log.init("Executing benchmark queries...") - if benchmark_context.time_dependent_execution != 0: - ret = client.execute( - queries=get_queries(func, count), - num_workers=benchmark_context.num_workers_for_benchmark, - time_dependent_execution=benchmark_context.time_dependent_execution, - )[0] - else: - ret = client.execute( - queries=get_queries(func, count), - num_workers=benchmark_context.num_workers_for_benchmark, - )[0] - log.info("Benchmark execution finished...") - usage = vendor_runner.stop_db( - workload.NAME + workload.get_variant() + "_" + benchmark_context.mode + "_" + query - ) - ret["database"] = usage - # Output summary. - - log.log("Executed {} queries in {} seconds.".format(ret["count"], ret["duration"])) - log.log("Queries have been retried {} times".format(ret["retries"])) - log.log("Database used {:.3f} seconds of CPU time.".format(usage["cpu"])) - log.info("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0)) - if "docker" not in benchmark_context.vendor_name: - log.log("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max")) - metadata = ret["metadata"] - for key in sorted(metadata.keys()): - log.log( - "{name:>30}: {minimum:>20.06f} {average:>20.06f} " - "{maximum:>20.06f}".format(name=key, **metadata[key]) - ) - print("\n") - log.info("Result:") - log.info(funcname) - log.info(sample_query) - log.success("Latency statistics:") - for key, value in ret["latency_stats"].items(): - if key == "iterations": - log.success("{:<10} {:>10}".format(key, value)) - else: - log.success("{:<10} {:>10.06f} seconds".format(key, value)) - - log.success("Throughput: {:02f} QPS".format(ret["throughput"])) - print("\n\n") - - # Save results. - results_key = [ - workload.NAME, - workload.get_variant(), - group, - query, - WITHOUT_FINE_GRAINED_AUTHORIZATION, - ] - results.set_value(*results_key, value=ret) - - # If there is need for authorization testing. - if benchmark_context.no_authorization: - log.init("Running queries with authorization...") - log.info("Setting USER and PRIVILEGES...") - vendor_runner.start_db("authorization") - client.execute( - queries=[ - ("CREATE USER user IDENTIFIED BY 'test';", {}), - ("GRANT ALL PRIVILEGES TO user;", {}), - ("GRANT CREATE_DELETE ON EDGE_TYPES * TO user;", {}), - ("GRANT CREATE_DELETE ON LABELS * TO user;", {}), - ] - ) - - client.set_credentials(username="user", password="test") - vendor_runner.stop_db("authorization") - - for query, funcname in queries[group]: - log.init( - "Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION) - ) - func = getattr(workload, funcname) - - config_key = [ - workload.NAME, - workload.get_variant(), - group, - query, - ] - count = get_query_cache_count( - vendor_runner, client, get_queries(func, 1), config_key, benchmark_context - ) - - vendor_runner.start_db("authorization") - warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) - - ret = client.execute( - queries=get_queries(func, count), - num_workers=benchmark_context.num_workers_for_benchmark, - )[0] - usage = vendor_runner.stop_db("authorization") - ret["database"] = usage - # Output summary. - log.log("Executed {} queries in {} seconds.".format(ret["count"], ret["duration"])) - log.log("Queries have been retried {} times".format(ret["retries"])) - log.log("Database used {:.3f} seconds of CPU time.".format(usage["cpu"])) - log.log("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0)) - log.log("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max")) - metadata = ret["metadata"] - for key in sorted(metadata.keys()): - log.log( - "{name:>30}: {minimum:>20.06f} {average:>20.06f} " - "{maximum:>20.06f}".format(name=key, **metadata[key]) - ) - log.success("Throughput: {:02f} QPS".format(ret["throughput"])) - # Save results. - results_key = [ - workload.NAME, - workload.get_variant(), - group, - query, - WITH_FINE_GRAINED_AUTHORIZATION, - ] - results.set_value(*results_key, value=ret) - - log.info("Deleting USER and PRIVILEGES...") - vendor_runner.start_db("authorizations") - ret = client.execute( - queries=[ - ("REVOKE LABELS * FROM user;", {}), - ("REVOKE EDGE_TYPES * FROM user;", {}), - ("DROP USER user;", {}), - ] - ) - vendor_runner.stop_db("authorization") - - # Save configuration. if not benchmark_context.no_save_query_counts: cache.save_config(config) - # Export results. + log_benchmark_summary(results.get_data()) + if benchmark_context.export_results: with open(benchmark_context.export_results, "w") as f: json.dump(results.get_data(), f) - - # Results summary. - log.init("~" * 45) - log.info("Benchmark finished.") - log.init("~" * 45) - log.log("\n") - log.summary("Benchmark summary") - log.log("-" * 90) - log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage")) - with open(benchmark_context.export_results, "r") as f: - results = json.load(f) - for dataset, variants in results.items(): - if dataset == "__run_configuration__": - continue - for variant, groups in variants.items(): - for group, queries in groups.items(): - if group == "__import__": - continue - for query, auth in queries.items(): - for key, value in auth.items(): - log.log("-" * 90) - log.summary( - "{:<20} {:>26.2f} QPS {:>27.2f} MB".format( - query, value["throughput"], value["database"]["memory"] / 1024.0 / 1024.0 - ) - ) - log.log("-" * 90) diff --git a/tests/mgbench/benchmark_context.py b/tests/mgbench/benchmark_context.py index 71f507fdc..aa1c098bd 100644 --- a/tests/mgbench/benchmark_context.py +++ b/tests/mgbench/benchmark_context.py @@ -9,8 +9,13 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. +from workload_mode import ( + BENCHMARK_MODE_ISOLATED, + BENCHMARK_MODE_MIXED, + BENCHMARK_MODE_REALISTIC, +) + -# Describes all the information of single benchmark.py run. class BenchmarkContext: """ Class for holding information on what type of benchmark is being executed @@ -38,6 +43,8 @@ class BenchmarkContext: no_authorization: bool = True, customer_workloads: str = None, vendor_args: dict = {}, + disk_storage: bool = False, + in_memory_analytical: bool = False, ) -> None: self.benchmark_target_workload = benchmark_target_workload self.vendor_binary = vendor_binary @@ -52,15 +59,19 @@ class BenchmarkContext: self.export_results = export_results self.temporary_directory = temporary_directory + assert ( + workload_mixed is None or workload_realistic is None + ), "Cannot run both mixed and realistic workload, please select one!" + if workload_mixed != None: - self.mode = "Mixed" + self.mode = BENCHMARK_MODE_MIXED self.mode_config = workload_mixed elif workload_realistic != None: - self.mode = "Realistic" + self.mode = BENCHMARK_MODE_REALISTIC self.mode_config = workload_realistic else: - self.mode = "Isolated" - self.mode_config = "Isolated run does not have a config." + self.mode = BENCHMARK_MODE_ISOLATED + self.mode_config = None self.time_dependent_execution = time_dependent_execution self.performance_tracking = performance_tracking @@ -70,6 +81,8 @@ class BenchmarkContext: self.vendor_args = vendor_args self.active_workload = None self.active_variant = None + self.disk_storage = disk_storage + self.in_memory_analytical = in_memory_analytical def set_active_workload(self, workload: str) -> None: self.active_workload = workload diff --git a/tests/mgbench/graph_bench.py b/tests/mgbench/graph_bench.py index bcba55324..f329cfcb7 100644 --- a/tests/mgbench/graph_bench.py +++ b/tests/mgbench/graph_bench.py @@ -127,6 +127,8 @@ def run_full_benchmarks( ], ] + assert not realistic or not mixed, "Cannot run both realistic and mixed workload, please select one!" + if realistic: # Configurations for full workload for count, write, read, update, analytical in realistic: diff --git a/tests/mgbench/helpers.py b/tests/mgbench/helpers.py index d90cbe9a3..b860440f9 100644 --- a/tests/mgbench/helpers.py +++ b/tests/mgbench/helpers.py @@ -188,6 +188,7 @@ def filter_workloads(available_workloads: dict, benchmark_context: BenchmarkCont raise Exception("Invalid benchmark description '" + pattern + "'!") pattern.extend(["", "*", "*"][len(pattern) - 1 :]) patterns[i] = pattern + filtered = [] for workload in sorted(available_workloads.keys()): generator, queries = available_workloads[workload] diff --git a/tests/mgbench/workload_mode.py b/tests/mgbench/workload_mode.py new file mode 100644 index 000000000..096526d12 --- /dev/null +++ b/tests/mgbench/workload_mode.py @@ -0,0 +1,14 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +BENCHMARK_MODE_MIXED = "Mixed" +BENCHMARK_MODE_REALISTIC = "Realistic" +BENCHMARK_MODE_ISOLATED = "Isolated"