From 2fd34489af691524affbe135bdce516927d51ef8 Mon Sep 17 00:00:00 2001 From: Andi <andi8647@gmail.com> Date: Fri, 6 Oct 2023 10:19:29 +0200 Subject: [PATCH] Add mgbench support for disk storage and analytical mode (#1286) * Add mgbench support for disk storage and analytical mode --- .github/workflows/diff.yaml | 5 +- tests/mgbench/README.md | 6 + tests/mgbench/benchmark.py | 669 +++++++++--------- tests/mgbench/benchmark_context.py | 8 +- tests/mgbench/benchmark_results.py | 19 + tests/mgbench/constants.py | 56 ++ tests/mgbench/helpers.py | 6 +- tests/mgbench/log.py | 17 + tests/mgbench/runners.py | 6 - tests/mgbench/workloads/base.py | 220 ++++-- tests/mgbench/workloads/disk_pokec.py | 449 ++++++++++++ .../importers/disk_importer_pokec.py | 62 ++ tools/bench-graph-client/main.py | 67 +- 13 files changed, 1185 insertions(+), 405 deletions(-) create mode 100644 tests/mgbench/benchmark_results.py create mode 100644 tests/mgbench/constants.py create mode 100644 tests/mgbench/workloads/disk_pokec.py create mode 100644 tests/mgbench/workloads/importers/disk_importer_pokec.py diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index 0fd4a930b..08be26907 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -417,11 +417,12 @@ jobs: source ve3/bin/activate pip install -r requirements.txt ./main.py --benchmark-name "macro_benchmark" \ - --benchmark-results-path "../../tests/macro_benchmark/.harness_summary" \ + --benchmark-results "../../tests/macro_benchmark/.harness_summary" \ --github-run-id "${{ github.run_id }}" \ --github-run-number "${{ github.run_number }}" \ --head-branch-name "${{ env.BRANCH_NAME }}" + # TODO (andi) No need for path flags and for --disk-storage and --in-memory-analytical - name: Run mgbench run: | cd tests/mgbench @@ -434,7 +435,7 @@ jobs: source ve3/bin/activate pip install -r requirements.txt ./main.py --benchmark-name "mgbench" \ - --benchmark-results-path "../../tests/mgbench/benchmark_result.json" \ + --benchmark-results "../../tests/mgbench/benchmark_result.json" \ --github-run-id "${{ github.run_id }}" \ --github-run-number "${{ github.run_number }}" \ --head-branch-name "${{ env.BRANCH_NAME }}" diff --git a/tests/mgbench/README.md b/tests/mgbench/README.md index 813090f66..f4181cf11 100644 --- a/tests/mgbench/README.md +++ b/tests/mgbench/README.md @@ -342,6 +342,12 @@ Benchgraph is currently a passive benchmark since resource usage and saturation Latest version: https://memgraph.com/benchgraph +### Release v3 (latest) - 2023-02-10 + +- Improvements have been made for the sake of Memgraph internal performance testing. + - https://github.com/memgraph/memgraph/pull/1286 + - https://github.com/memgraph/memgraph/pull/1280 + ### Release v2 (latest) - 2023-25-04 - Benchmark process changes: diff --git a/tests/mgbench/benchmark.py b/tests/mgbench/benchmark.py index 12af6ead7..cd3fb846f 100755 --- a/tests/mgbench/benchmark.py +++ b/tests/mgbench/benchmark.py @@ -19,6 +19,7 @@ import platform import random import sys import time +from copy import deepcopy from typing import Dict, List import helpers @@ -26,54 +27,11 @@ import log import runners import setup from benchmark_context import BenchmarkContext +from benchmark_results import BenchmarkResults +from constants import * from workload_mode import BENCHMARK_MODE_MIXED, BENCHMARK_MODE_REALISTIC from workloads import * -WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization" -WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization" -RUN_CONFIGURATION = "__run_configuration__" -IMPORT = "__import__" -THROUGHPUT = "throughput" -DATABASE = "database" -MEMORY = "memory" -VENDOR = "vendor" -CONDITION = "condition" -NUM_WORKERS_FOR_BENCHMARK = "num_workers_for_benchmark" -SINGLE_THREADED_RUNTIME_SEC = "single_threaded_runtime_sec" -BENCHMARK_MODE = "benchmark_mode" -BENCHMARK_MODE_CONFIG = "benchmark_mode_config" -PLATFORM = "platform" -COUNT = "count" -DURATION = "duration" -NUM_WORKERS = "num_workers" -CPU = "cpu" -MEMORY = "memory" -VENDOR_RUNNER_IMPORT = "import" -VENDOR_RUNNER_AUTHORIZATION = "authorization" -CLIENT = "client" -DATABASE = "database" -CUSTOM_LOAD = "custom_load" -RETRIES = "retries" -DOCKER = "docker" -METADATA = "metadata" -LATENCY_STATS = "latency_stats" -ITERATIONS = "iterations" -USERNAME = "user" -PASSWORD = "test" -DATABASE_CONDITION_HOT = "hot" -DATABASE_CONDITION_VULCANIC = "vulcanic" -WRITE_TYPE_QUERY = "write" -READ_TYPE_QUERY = "read" -UPDATE_TYPE_QUERY = "update" -ANALYTICAL_TYPE_QUERY = "analytical" -QUERY = "query" -CACHE = "cache" -DISK_PREPARATION_RSS = "disk_storage_preparation" -IN_MEMORY_ANALYTICAL_RSS = "in_memory_analytical_preparation" - -IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL" - - WARMUP_TO_HOT_QUERIES = [ ("CREATE ();", {}), ("CREATE ()-[:TempEdge]->();", {}), @@ -161,7 +119,19 @@ def parse_args(): benchmark_parser.add_argument( "--export-results", default=None, - help="file path into which results should be exported", + help="file path into which results for in_memory_transactional storage mode should be exported", + ) + + benchmark_parser.add_argument( + "--export-results-in-memory-analytical", + default=None, + help="File path into which results for in_memory_analytical storage mode should be exported. If set, benchmarks for analytical mode will be run.", + ) + + benchmark_parser.add_argument( + "--export-results-on-disk-txn", + default=None, + help="File path into which results for on_disk_transactional storage mode should be exported. If set, benchmarks for disk storage will be run.", ) benchmark_parser.add_argument( @@ -220,20 +190,6 @@ def parse_args(): benchmark_parser.add_argument("--customer-workloads", default=None, help="Path to customers workloads") - benchmark_parser.add_argument( - "--disk-storage", - action="store_true", - default=False, - help="If the flag set, benchmarks will be run also for disk storage.", - ) - - benchmark_parser.add_argument( - "--in-memory-analytical", - action="store_true", - default=False, - help="If the flag set, benchmarks will be run also for in_memory_analytical.", - ) - benchmark_parser.add_argument( "--vendor-specific", nargs="*", @@ -279,6 +235,17 @@ def parse_args(): return parser.parse_args() +def sanitize_args(args): + assert args.benchmarks != None, helpers.list_available_workloads() + assert args.num_workers_for_import > 0 + assert args.num_workers_for_benchmark > 0 + assert args.export_results != None, "Pass where will results be saved" + assert args.single_threaded_runtime_sec >= 1, "Low runtime value, consider extending time for more accurate results" + assert ( + args.workload_realistic == None or args.workload_mixed == None + ), "Cannot run both realistic and mixed workload, only one mode run at the time" + + def get_queries(gen, count): random.seed(gen.__name__) ret = [] @@ -287,21 +254,6 @@ def get_queries(gen, count): return ret -def warmup(condition: str, client: runners.BaseRunner, queries: list = None): - if condition == DATABASE_CONDITION_HOT: - log.log("Execute warm-up to match condition: {} ".format(condition)) - client.execute( - queries=WARMUP_TO_HOT_QUERIES, - num_workers=1, - ) - elif condition == DATABASE_CONDITION_VULCANIC: - log.log("Execute warm-up to match condition: {} ".format(condition)) - client.execute(queries=queries) - else: - log.log("No warm-up on condition: {} ".format(condition)) - log.log("Finished warm-up procedure to match database condition: {} ".format(condition)) - - def validate_workload_distribution(percentage_distribution, queries_by_type): percentages_by_type = { WRITE_TYPE_QUERY: percentage_distribution[0], @@ -360,7 +312,13 @@ def prepare_for_workload(benchmark_context, dataset, group, queries): def realistic_workload( - vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext + vendor: runners.BaseRunner, + client: runners.BaseClient, + dataset, + group, + queries, + benchmark_context: BenchmarkContext, + results, ): log.log("Executing realistic workload...") config_distribution, queries_by_type, _, percentage_distribution, num_of_queries = prepare_for_workload( @@ -407,7 +365,13 @@ def realistic_workload( def mixed_workload( - vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext + vendor: runners.BaseRunner, + client: runners.BaseClient, + dataset, + group, + queries, + benchmark_context: BenchmarkContext, + results, ): log.log("Executing mixed workload...") ( @@ -467,6 +431,21 @@ def mixed_workload( results.set_value(*results_key, value=ret) +def warmup(condition: str, client: runners.BaseRunner, queries: list = None): + if condition == DATABASE_CONDITION_HOT: + log.log("Execute warm-up to match condition: {} ".format(condition)) + client.execute( + queries=WARMUP_TO_HOT_QUERIES, + num_workers=1, + ) + elif condition == DATABASE_CONDITION_VULCANIC: + log.log("Execute warm-up to match condition: {} ".format(condition)) + client.execute(queries=queries) + else: + log.log("No warm-up on condition: {} ".format(condition)) + log.log("Finished warm-up procedure to match database condition: {} ".format(condition)) + + def get_query_cache_count( vendor: runners.BaseRunner, client: runners.BaseClient, @@ -477,16 +456,12 @@ def get_query_cache_count( query: str, func: str, ): - log.init("Determining query count for benchmark based on --single-threaded-runtime argument") + log.init( + f"Determining query count for benchmark based on --single-threaded-runtime argument = {benchmark_context.single_threaded_runtime_sec}s" + ) config_key = [workload.NAME, workload.get_variant(), group, query] cached_count = config.get_value(*config_key) if cached_count is None: - log.info( - "Determining the number of queries necessary for {} seconds of single-threaded runtime...".format( - benchmark_context.single_threaded_runtime_sec - ) - ) - log.log("Running query to prime the query cache...") vendor.start_db(CACHE) client.execute(queries=queries, num_workers=1) count = 1 @@ -495,18 +470,16 @@ def get_query_cache_count( duration = ret[0][DURATION] should_execute = int(benchmark_context.single_threaded_runtime_sec / (duration / count)) log.log( - "executed_queries={}, total_duration={}, query_duration={}, estimated_count={}".format( + "Executed_queries={}, total_duration={}, query_duration={}, estimated_count={}".format( count, duration, duration / count, should_execute ) ) - # We don't have to execute the next iteration when - # `should_execute` becomes the same order of magnitude as - # `count * 10`. if should_execute / (count * 10) < 10: count = should_execute break else: count = count * 10 + vendor.stop_db(CACHE) if count < benchmark_context.query_count_lower_bound: @@ -529,38 +502,6 @@ def get_query_cache_count( return count -def log_benchmark_summary(results: Dict): - log.init("~" * 45) - log.info("Benchmark finished.") - log.init("~" * 45) - log.log("\n") - log.summary("Benchmark summary") - log.log("-" * 90) - log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage")) - for dataset, variants in results.items(): - if dataset == RUN_CONFIGURATION: - continue - for groups in variants.values(): - for group, queries in groups.items(): - if group == IMPORT: - continue - for query, auth in queries.items(): - for value in auth.values(): - log.log("-" * 90) - log.summary( - "{:<20} {:>26.2f} QPS {:>27.2f} MB".format( - query, value[THROUGHPUT], value[DATABASE][MEMORY] / (1024.0 * 1024.0) - ) - ) - log.log("-" * 90) - - -def log_benchmark_arguments(benchmark_context): - log.init("Executing benchmark with following arguments: ") - for key, value in benchmark_context.__dict__.items(): - log.log("{:<30} : {:<30}".format(str(key), str(value))) - - def check_benchmark_requirements(benchmark_context): if setup.check_requirements(benchmark_context): log.success("Requirements for starting benchmark satisfied!") @@ -577,19 +518,6 @@ def setup_cache_config(benchmark_context, cache): return helpers.RecursiveDict() -def sanitize_args(args): - assert args.benchmarks != None, helpers.list_available_workloads() - assert args.num_workers_for_import > 0 - assert args.num_workers_for_benchmark > 0 - assert args.export_results != None, "Pass where will results be saved" - assert ( - args.single_threaded_runtime_sec >= 10 - ), "Low runtime value, consider extending time for more accurate results" - assert ( - args.workload_realistic == None or args.workload_mixed == None - ), "Cannot run both realistic and mixed workload, only one mode run at the time" - - def save_import_results(workload, results, import_results, rss_usage): log.info("Summarized importing benchmark results:") import_key = [workload.NAME, workload.get_variant(), IMPORT] @@ -597,21 +525,279 @@ def save_import_results(workload, results, import_results, rss_usage): # Display import statistics. for row in import_results: log.success( - "Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format( - row[COUNT], row[DURATION], row[NUM_WORKERS], row[THROUGHPUT] - ) + f"Executed {row[COUNT]} queries in {row[DURATION]} seconds using {row[NUM_WORKERS]} workers with a total throughput of {row[THROUGHPUT]} Q/S." ) log.success( - "The database used {} seconds of CPU time and peaked at {} MiB of RAM".format( - rss_usage[CPU], rss_usage[MEMORY] / (1024 * 1024) - ) + f"The database used {rss_usage[CPU]} seconds of CPU time and peaked at {rss_usage[MEMORY] / (1024 * 1024)} MiB of RAM" ) + results.set_value(*import_key, value={CLIENT: import_results, DATABASE: rss_usage}) else: results.set_value(*import_key, value={CLIENT: CUSTOM_LOAD, DATABASE: CUSTOM_LOAD}) +def save_to_results(results, ret, workload, group, query, authorization_mode): + results_key = [ + workload.NAME, + workload.get_variant(), + group, + query, + authorization_mode, + ] + results.set_value(*results_key, value=ret) + + +def run_isolated_workload_with_authorization(vendor_runner, client, queries, group, workload, results): + log.init("Running isolated workload with authorization") + + log.info("Running preprocess AUTH queries") + vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) + client.execute(queries=SETUP_AUTH_QUERIES) + client.set_credentials(username=USERNAME, password=PASSWORD) + vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) + + for query, funcname in queries[group]: + log.init("Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION)) + func = getattr(workload, funcname) + count = get_query_cache_count( + vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func + ) + + vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) + start_time = time.time() + warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) + + ret = client.execute( + queries=get_queries(func, count), + num_workers=benchmark_context.num_workers_for_benchmark, + )[0] + usage = vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) + time_elapsed = time.time() - start_time + log.info(f"Benchmark execution of query {funcname} finished in {time_elapsed} seconds.") + + ret[DATABASE] = usage + log_metrics_summary(ret, usage) + log_metadata_summary(ret) + log.success("Throughput: {:02f} QPS".format(ret[THROUGHPUT])) + save_to_results(results, ret, workload, group, query, WITH_FINE_GRAINED_AUTHORIZATION) + + vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) + log.info("Running cleanup of auth queries") + ret = client.execute(queries=CLEANUP_AUTH_QUERIES) + vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) + + +def run_isolated_workload_without_authorization(vendor_runner, client, queries, group, workload, results): + log.init("Running isolated workload without authorization") + for query, funcname in queries[group]: + log.init( + "Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION), + ) + func = getattr(workload, funcname) + count = get_query_cache_count( + vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func + ) + + # Benchmark run. + sample_query = get_queries(func, 1)[0][0] + log.info("Sample query:{}".format(sample_query)) + log.log( + "Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format( + count, benchmark_context.single_threaded_runtime_sec + ) + ) + log.log("Queries are executed using {} concurrent clients".format(benchmark_context.num_workers_for_benchmark)) + + start_time = time.time() + rss_db = workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query + vendor_runner.start_db(rss_db) + warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) + log.init("Executing benchmark queries...") + ret = client.execute( + queries=get_queries(func, count), + num_workers=benchmark_context.num_workers_for_benchmark, + time_dependent_execution=benchmark_context.time_dependent_execution, + )[0] + + time_elapsed = time.time() - start_time + + log.info(f"Benchmark execution of query {funcname} finished in {time_elapsed} seconds.") + usage = vendor_runner.stop_db(rss_db) + + ret[DATABASE] = usage + log_output_summary(benchmark_context, ret, usage, funcname, sample_query) + + save_to_results(results, ret, workload, group, query, WITHOUT_FINE_GRAINED_AUTHORIZATION) + + +def setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload, storage_mode): + vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT) + log.info("Executing database index setup") + start_time = time.time() + + if generated_queries: + client.execute(queries=workload.indexes_generator(), num_workers=1) + log.info("Finished setting up indexes.") + log.info("Started importing dataset") + import_results = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import) + else: + log.info("Using workload information for importing dataset and creating indices") + log.info("Preparing workload: " + workload.NAME + "/" + workload.get_variant()) + workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant())) + imported = workload.custom_import() + if not imported: + client.execute(file_path=workload.get_index(), num_workers=1) + log.info("Finished setting up indexes.") + log.info("Started importing dataset") + if storage_mode == ON_DISK_TRANSACTIONAL: + import_results = client.execute(file_path=workload.get_node_file(), num_workers=1) + import_results = client.execute(file_path=workload.get_edge_file(), num_workers=1) + else: + import_results = client.execute( + file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import + ) + else: + log.info("Custom import executed") + + log.info(f"Finished importing dataset in {time.time() - start_time}s") + rss_usage = vendor_runner.stop_db_init(VENDOR_RUNNER_IMPORT) + + return import_results, rss_usage + + +def run_target_workload(benchmark_context, workload, bench_queries, vendor_runner, client, results, storage_mode): + generated_queries = workload.dataset_generator() + import_results, rss_usage = setup_indices_and_import_dataset( + client, vendor_runner, generated_queries, workload, storage_mode + ) + save_import_results(workload, results, import_results, rss_usage) + + for group in sorted(bench_queries.keys()): + log.init(f"\nRunning benchmark in {benchmark_context.mode} workload mode for {group} group") + if benchmark_context.mode == BENCHMARK_MODE_MIXED: + mixed_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context, results) + elif benchmark_context.mode == BENCHMARK_MODE_REALISTIC: + realistic_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context, results) + else: + run_isolated_workload_without_authorization(vendor_runner, client, bench_queries, group, workload, results) + + if benchmark_context.no_authorization: + run_isolated_workload_with_authorization(vendor_runner, client, bench_queries, group, workload, results) + + +# TODO: (andi) Reorder functions in top-down notion in order to improve readibility +def run_target_workloads(benchmark_context, target_workloads, bench_results): + for workload, bench_queries in target_workloads: + log.info(f"Started running {str(workload.NAME)} workload") + + benchmark_context.set_active_workload(workload.NAME) + benchmark_context.set_active_variant(workload.get_variant()) + + if workload.is_disk_workload() and benchmark_context.export_results_on_disk_txn: + run_on_disk_transactional_benchmark(benchmark_context, workload, bench_queries, bench_results.disk_results) + else: + run_in_memory_transactional_benchmark( + benchmark_context, workload, bench_queries, bench_results.in_memory_txn_results + ) + + if benchmark_context.export_results_in_memory_analytical: + run_in_memory_analytical_benchmark( + benchmark_context, workload, bench_queries, bench_results.in_memory_analytical_results + ) + + +def run_on_disk_transactional_benchmark(benchmark_context, workload, bench_queries, disk_results): + log.info(f"Running benchmarks for {ON_DISK_TRANSACTIONAL} storage mode.") + disk_vendor_runner, disk_client = client_runner_factory(benchmark_context) + disk_vendor_runner.start_db(DISK_PREPARATION_RSS) + disk_client.execute(queries=SETUP_DISK_STORAGE) + disk_vendor_runner.stop_db(DISK_PREPARATION_RSS) + run_target_workload( + benchmark_context, workload, bench_queries, disk_vendor_runner, disk_client, disk_results, ON_DISK_TRANSACTIONAL + ) + log.info(f"Finished running benchmarks for {ON_DISK_TRANSACTIONAL} storage mode.") + + +def run_in_memory_analytical_benchmark(benchmark_context, workload, bench_queries, in_memory_analytical_results): + log.info(f"Running benchmarks for {IN_MEMORY_ANALYTICAL} storage mode.") + in_memory_analytical_vendor_runner, in_memory_analytical_client = client_runner_factory(benchmark_context) + in_memory_analytical_vendor_runner.start_db(IN_MEMORY_ANALYTICAL_RSS) + in_memory_analytical_client.execute(queries=SETUP_IN_MEMORY_ANALYTICAL_STORAGE_MODE) + in_memory_analytical_vendor_runner.stop_db(IN_MEMORY_ANALYTICAL_RSS) + run_target_workload( + benchmark_context, + workload, + bench_queries, + in_memory_analytical_vendor_runner, + in_memory_analytical_client, + in_memory_analytical_results, + IN_MEMORY_ANALYTICAL, + ) + log.info(f"Finished running benchmarks for {IN_MEMORY_ANALYTICAL} storage mode.") + + +def run_in_memory_transactional_benchmark(benchmark_context, workload, bench_queries, in_memory_txn_results): + log.info(f"Running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.") + in_memory_txn_vendor_runner, in_memory_txn_client = client_runner_factory(benchmark_context) + run_target_workload( + benchmark_context, + workload, + bench_queries, + in_memory_txn_vendor_runner, + in_memory_txn_client, + in_memory_txn_results, + IN_MEMORY_TRANSACTIONAL, + ) + log.info(f"Finished running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.") + + +def client_runner_factory(benchmark_context): + vendor_runner = runners.BaseRunner.create(benchmark_context=benchmark_context) + vendor_runner.clean_db() + log.log("Database cleaned from any previous data") + client = vendor_runner.fetch_client() + return vendor_runner, client + + +def validate_target_workloads(benchmark_context, target_workloads): + if len(target_workloads) == 0: + log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload)) + log.error("Please check the pattern and workload NAME property, query group and query name.") + log.info("Currently available workloads: ") + log.log(helpers.list_available_workloads(benchmark_context.customer_workloads)) + sys.exit(1) + + +def log_benchmark_summary(results: Dict, storage_mode): + log.log("\n") + log.summary(f"Benchmark summary of {storage_mode} mode") + log.log("-" * 120) + log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage")) + for dataset, variants in results.items(): + if dataset == RUN_CONFIGURATION: + continue + for groups in variants.values(): + for group, queries in groups.items(): + if group == IMPORT: + continue + for query, auth in queries.items(): + for value in auth.values(): + log.log("-" * 120) + log.summary( + "{:<20} {:>26.2f} QPS {:>27.2f} MB".format( + query, value[THROUGHPUT], value[DATABASE][MEMORY] / (1024.0 * 1024.0) + ) + ) + log.log("-" * 90) + + +def log_benchmark_arguments(benchmark_context): + log.init("Executing benchmark with following arguments: ") + for key, value in benchmark_context.__dict__.items(): + log.log("{:<30} : {:<30}".format(str(key), str(value))) + + def log_metrics_summary(ret, usage): log.log("Executed {} queries in {} seconds.".format(ret[COUNT], ret[DURATION])) log.log("Queries have been retried {} times".format(ret[RETRIES])) @@ -645,175 +831,6 @@ def log_output_summary(benchmark_context, ret, usage, funcname, sample_query): log.success("Throughput: {:02f} QPS\n\n".format(ret[THROUGHPUT])) -def save_to_results(results, ret, workload, group, query, authorization_mode): - results_key = [ - workload.NAME, - workload.get_variant(), - group, - query, - authorization_mode, - ] - results.set_value(*results_key, value=ret) - - -def run_isolated_workload_with_authorization(vendor_runner, client, queries, group, workload): - log.init("Running isolated workload with authorization") - - log.info("Running preprocess AUTH queries") - vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) - client.execute(queries=SETUP_AUTH_QUERIES) - client.set_credentials(username=USERNAME, password=PASSWORD) - vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) - - for query, funcname in queries[group]: - log.init("Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION)) - func = getattr(workload, funcname) - count = get_query_cache_count( - vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func - ) - - vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) - warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) - - ret = client.execute( - queries=get_queries(func, count), - num_workers=benchmark_context.num_workers_for_benchmark, - )[0] - usage = vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) - - ret[DATABASE] = usage - log_metrics_summary(ret, usage) - log_metadata_summary(ret) - log.success("Throughput: {:02f} QPS".format(ret[THROUGHPUT])) - save_to_results(results, ret, workload, group, query, WITH_FINE_GRAINED_AUTHORIZATION) - - vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION) - log.info("Running cleanup of auth queries") - ret = client.execute(queries=CLEANUP_AUTH_QUERIES) - vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION) - - -def run_isolated_workload_without_authorization(vendor_runner, client, queries, group, workload): - log.init("Running isolated workload without authorization") - for query, funcname in queries[group]: - log.init( - "Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION), - ) - func = getattr(workload, funcname) - count = get_query_cache_count( - vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func - ) - - # Benchmark run. - sample_query = get_queries(func, 1)[0][0] - log.info("Sample query:{}".format(sample_query)) - log.log( - "Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format( - count, benchmark_context.single_threaded_runtime_sec - ) - ) - log.log("Queries are executed using {} concurrent clients".format(benchmark_context.num_workers_for_benchmark)) - - rss_db = workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query - vendor_runner.start_db(rss_db) - warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count)) - log.init("Executing benchmark queries...") - ret = client.execute( - queries=get_queries(func, count), - num_workers=benchmark_context.num_workers_for_benchmark, - time_dependent_execution=benchmark_context.time_dependent_execution, - )[0] - - log.info("Benchmark execution finished...") - usage = vendor_runner.stop_db(rss_db) - - ret[DATABASE] = usage - log_output_summary(benchmark_context, ret, usage, funcname, sample_query) - - save_to_results(results, ret, workload, group, query, WITHOUT_FINE_GRAINED_AUTHORIZATION) - - -def setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload): - vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT) - log.info("Executing database index setup") - - if generated_queries: - client.execute(queries=workload.indexes_generator(), num_workers=1) - log.info("Finished setting up indexes.") - log.info("Started importing dataset") - import_results = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import) - else: - log.info("Using workload information for importing dataset and creating indices") - log.info("Preparing workload: " + workload.NAME + "/" + workload.get_variant()) - workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant())) - imported = workload.custom_import() - if not imported: - client.execute(file_path=workload.get_index(), num_workers=1) - log.info("Finished setting up indexes.") - log.info("Started importing dataset") - import_results = client.execute( - file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import - ) - else: - log.info("Custom import executed") - - log.info("Finished importing dataset") - rss_usage = vendor_runner.stop_db_init(VENDOR_RUNNER_IMPORT) - - return import_results, rss_usage - - -def run_target_workload(benchmark_context, workload, bench_queries, vendor_runner, client): - generated_queries = workload.dataset_generator() - import_results, rss_usage = setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload) - save_import_results(workload, results, import_results, rss_usage) - - for group in sorted(bench_queries.keys()): - log.init(f"\nRunning benchmark in {benchmark_context.mode} workload mode for {group} group") - if benchmark_context.mode == BENCHMARK_MODE_MIXED: - mixed_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context) - elif benchmark_context.mode == BENCHMARK_MODE_REALISTIC: - realistic_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context) - else: - run_isolated_workload_without_authorization(vendor_runner, client, bench_queries, group, workload) - - if benchmark_context.no_authorization: - run_isolated_workload_with_authorization(vendor_runner, client, bench_queries, group, workload) - - -# TODO: (andi) Reorder functions in top-down notion in order to improve readibility -def run_target_workloads(benchmark_context, target_workloads): - for workload, bench_queries in target_workloads: - log.info(f"Started running {str(workload.NAME)} workload") - - benchmark_context.set_active_workload(workload.NAME) - benchmark_context.set_active_variant(workload.get_variant()) - - log.info(f"Running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.") - in_memory_txn_vendor_runner, in_memory_txn_client = client_runner_factory(benchmark_context) - run_target_workload( - benchmark_context, workload, bench_queries, in_memory_txn_vendor_runner, in_memory_txn_client - ) - log.info(f"Finished running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.") - - -def client_runner_factory(benchmark_context): - vendor_runner = runners.BaseRunner.create(benchmark_context=benchmark_context) - vendor_runner.clean_db() - log.log("Database cleaned from any previous data") - client = vendor_runner.fetch_client() - return vendor_runner, client - - -def validate_target_workloads(benchmark_context, target_workloads): - if len(target_workloads) == 0: - log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload)) - log.error("Please check the pattern and workload NAME property, query group and query name.") - log.info("Currently available workloads: ") - log.log(helpers.list_available_workloads(benchmark_context.customer_workloads)) - sys.exit(1) - - if __name__ == "__main__": args = parse_args() sanitize_args(args) @@ -833,6 +850,8 @@ if __name__ == "__main__": query_count_lower_bound=args.query_count_lower_bound, no_load_query_counts=args.no_load_query_counts, export_results=args.export_results, + export_results_in_memory_analytical=args.export_results_in_memory_analytical, + export_results_on_disk_txn=args.export_results_on_disk_txn, temporary_directory=temp_dir.absolute(), workload_mixed=args.workload_mixed, workload_realistic=args.workload_realistic, @@ -842,8 +861,6 @@ if __name__ == "__main__": no_authorization=args.no_authorization, customer_workloads=args.customer_workloads, vendor_args=vendor_specific_args, - disk_storage=args.disk_storage, - in_memory_analytical=args.in_memory_analytical, ) log_benchmark_arguments(benchmark_context) @@ -854,7 +871,7 @@ if __name__ == "__main__": log.log("Cache folder in use: " + cache.get_default_cache_directory()) config = setup_cache_config(benchmark_context, cache) - run_config = { + in_memory_txn_run_config = { VENDOR: benchmark_context.vendor_name, CONDITION: benchmark_context.warm_up, NUM_WORKERS_FOR_BENCHMARK: benchmark_context.num_workers_for_benchmark, @@ -862,10 +879,19 @@ if __name__ == "__main__": BENCHMARK_MODE: benchmark_context.mode, BENCHMARK_MODE_CONFIG: benchmark_context.mode_config, PLATFORM: platform.platform(), + STORAGE_MODE: IN_MEMORY_TRANSACTIONAL, } - results = helpers.RecursiveDict() - results.set_value(RUN_CONFIGURATION, value=run_config) + in_memory_analytical_run_config = deepcopy(in_memory_txn_run_config) + in_memory_analytical_run_config[STORAGE_MODE] = IN_MEMORY_ANALYTICAL + + on_disk_transactional_run_config = deepcopy(in_memory_txn_run_config) + on_disk_transactional_run_config[STORAGE_MODE] = ON_DISK_TRANSACTIONAL + + bench_results = BenchmarkResults() + bench_results.in_memory_txn_results.set_value(RUN_CONFIGURATION, value=in_memory_txn_run_config) + bench_results.in_memory_analytical_results.set_value(RUN_CONFIGURATION, value=in_memory_analytical_run_config) + bench_results.disk_results.set_value(RUN_CONFIGURATION, value=on_disk_transactional_run_config) available_workloads = helpers.get_available_workloads(benchmark_context.customer_workloads) @@ -876,13 +902,22 @@ if __name__ == "__main__": ) validate_target_workloads(benchmark_context, target_workloads) - run_target_workloads(benchmark_context, target_workloads) + run_target_workloads(benchmark_context, target_workloads, bench_results) if not benchmark_context.no_save_query_counts: cache.save_config(config) - log_benchmark_summary(results.get_data()) - + log_benchmark_summary(bench_results.in_memory_txn_results.get_data(), IN_MEMORY_TRANSACTIONAL) if benchmark_context.export_results: with open(benchmark_context.export_results, "w") as f: - json.dump(results.get_data(), f) + json.dump(bench_results.in_memory_txn_results.get_data(), f) + + log_benchmark_summary(bench_results.in_memory_analytical_results.get_data(), IN_MEMORY_ANALYTICAL) + if benchmark_context.export_results_in_memory_analytical: + with open(benchmark_context.export_results_in_memory_analytical, "w") as f: + json.dump(bench_results.in_memory_analytical_results.get_data(), f) + + log_benchmark_summary(bench_results.disk_results.get_data(), ON_DISK_TRANSACTIONAL) + if benchmark_context.export_results_on_disk_txn: + with open(benchmark_context.export_results_on_disk_txn, "w") as f: + json.dump(bench_results.disk_results.get_data(), f) diff --git a/tests/mgbench/benchmark_context.py b/tests/mgbench/benchmark_context.py index aa1c098bd..034c2c93a 100644 --- a/tests/mgbench/benchmark_context.py +++ b/tests/mgbench/benchmark_context.py @@ -34,6 +34,8 @@ class BenchmarkContext: no_load_query_counts: bool = False, no_save_query_counts: bool = False, export_results: str = None, + export_results_in_memory_analytical: str = None, + export_results_on_disk_txn: str = None, temporary_directory: str = None, workload_mixed: str = None, # Default mode is isolated, mixed None workload_realistic: str = None, # Default mode is isolated, realistic None @@ -43,8 +45,6 @@ class BenchmarkContext: no_authorization: bool = True, customer_workloads: str = None, vendor_args: dict = {}, - disk_storage: bool = False, - in_memory_analytical: bool = False, ) -> None: self.benchmark_target_workload = benchmark_target_workload self.vendor_binary = vendor_binary @@ -57,6 +57,8 @@ class BenchmarkContext: self.no_load_query_counts = no_load_query_counts self.no_save_query_counts = no_save_query_counts self.export_results = export_results + self.export_results_in_memory_analytical = export_results_in_memory_analytical + self.export_results_on_disk_txn = export_results_on_disk_txn self.temporary_directory = temporary_directory assert ( @@ -81,8 +83,6 @@ class BenchmarkContext: self.vendor_args = vendor_args self.active_workload = None self.active_variant = None - self.disk_storage = disk_storage - self.in_memory_analytical = in_memory_analytical def set_active_workload(self, workload: str) -> None: self.active_workload = workload diff --git a/tests/mgbench/benchmark_results.py b/tests/mgbench/benchmark_results.py new file mode 100644 index 000000000..c82cf2d3d --- /dev/null +++ b/tests/mgbench/benchmark_results.py @@ -0,0 +1,19 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import helpers + + +class BenchmarkResults: + def __init__(self) -> None: + self.in_memory_txn_results = helpers.RecursiveDict() + self.in_memory_analytical_results = helpers.RecursiveDict() + self.disk_results = helpers.RecursiveDict() diff --git a/tests/mgbench/constants.py b/tests/mgbench/constants.py new file mode 100644 index 000000000..211d9a413 --- /dev/null +++ b/tests/mgbench/constants.py @@ -0,0 +1,56 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization" +WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization" +RUN_CONFIGURATION = "__run_configuration__" +IMPORT = "__import__" +THROUGHPUT = "throughput" +DATABASE = "database" +MEMORY = "memory" +VENDOR = "vendor" +CONDITION = "condition" +NUM_WORKERS_FOR_BENCHMARK = "num_workers_for_benchmark" +SINGLE_THREADED_RUNTIME_SEC = "single_threaded_runtime_sec" +BENCHMARK_MODE = "benchmark_mode" +BENCHMARK_MODE_CONFIG = "benchmark_mode_config" +PLATFORM = "platform" +COUNT = "count" +DURATION = "duration" +NUM_WORKERS = "num_workers" +CPU = "cpu" +MEMORY = "memory" +VENDOR_RUNNER_IMPORT = "import" +VENDOR_RUNNER_AUTHORIZATION = "authorization" +CLIENT = "client" +DATABASE = "database" +CUSTOM_LOAD = "custom_load" +RETRIES = "retries" +DOCKER = "docker" +METADATA = "metadata" +LATENCY_STATS = "latency_stats" +ITERATIONS = "iterations" +USERNAME = "user" +PASSWORD = "test" +DATABASE_CONDITION_HOT = "hot" +DATABASE_CONDITION_VULCANIC = "vulcanic" +WRITE_TYPE_QUERY = "write" +READ_TYPE_QUERY = "read" +UPDATE_TYPE_QUERY = "update" +ANALYTICAL_TYPE_QUERY = "analytical" +QUERY = "query" +CACHE = "cache" +DISK_PREPARATION_RSS = "disk_storage_preparation" +IN_MEMORY_ANALYTICAL_RSS = "in_memory_analytical_preparation" +STORAGE_MODE = "storage_mode" +IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL" +IN_MEMORY_ANALYTICAL = "IN_MEMORY_ANALYTICAL" +ON_DISK_TRANSACTIONAL = "ON_DISK_TRANSACTIONAL" diff --git a/tests/mgbench/helpers.py b/tests/mgbench/helpers.py index b860440f9..11d78fea5 100644 --- a/tests/mgbench/helpers.py +++ b/tests/mgbench/helpers.py @@ -20,10 +20,10 @@ import subprocess import sys from pathlib import Path +import log import workloads from benchmark_context import BenchmarkContext from workloads import * -from workloads import base SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -116,7 +116,7 @@ def get_available_workloads(customer_workloads: str = None) -> dict: if key.startswith("_"): continue base_class = getattr(module, key) - if not inspect.isclass(base_class) or not issubclass(base_class, base.Workload): + if not inspect.isclass(base_class) or not issubclass(base_class, workloads.base.Workload): continue queries = collections.defaultdict(list) for funcname in dir(base_class): @@ -137,7 +137,7 @@ def get_available_workloads(customer_workloads: str = None) -> dict: if key.startswith("_"): continue base_class = getattr(dataset_to_use, key) - if not inspect.isclass(base_class) or not issubclass(base_class, base.Workload): + if not inspect.isclass(base_class) or not issubclass(base_class, workloads.base.Workload): continue queries = collections.defaultdict(list) for funcname in dir(base_class): diff --git a/tests/mgbench/log.py b/tests/mgbench/log.py index 64b648937..8b8d8fad1 100644 --- a/tests/mgbench/log.py +++ b/tests/mgbench/log.py @@ -10,6 +10,23 @@ # licenses/APL.txt. import logging +from typing import Dict + +from constants import ( + COUNT, + CPU, + DATABASE, + DOCKER, + DURATION, + IMPORT, + ITERATIONS, + LATENCY_STATS, + MEMORY, + METADATA, + RETRIES, + RUN_CONFIGURATION, + THROUGHPUT, +) COLOR_GRAY = 0 COLOR_RED = 1 diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py index acd868377..e1f52b696 100644 --- a/tests/mgbench/runners.py +++ b/tests/mgbench/runners.py @@ -455,7 +455,6 @@ class Memgraph(BaseRunner): raise Exception("The database process died prematurely!") _wait_for_server_socket(self._bolt_port) ret = self._proc_mg.poll() - assert ret is None, "The database process died prematurely " "({})!".format(ret) def _cleanup(self): if self._proc_mg is None: @@ -479,7 +478,6 @@ class Memgraph(BaseRunner): self._stop_event.set() self.dump_rss(workload) ret, usage = self._cleanup() - assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret) return usage def start_db(self, workload): @@ -495,7 +493,6 @@ class Memgraph(BaseRunner): self._stop_event.set() self.dump_rss(workload) ret, usage = self._cleanup() - assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret) return usage def clean_db(self): @@ -644,7 +641,6 @@ class Neo4j(BaseRunner): self._rss.clear() p.start() - # Start DB self._start() if self._performance_tracking: @@ -657,7 +653,6 @@ class Neo4j(BaseRunner): self.dump_rss(workload) ret, usage = self._cleanup() self.dump_db(path=self._neo4j_dump.parent) - assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret) return usage def start_db(self, workload): @@ -689,7 +684,6 @@ class Neo4j(BaseRunner): self.get_memory_usage("stop_" + workload) self.dump_rss(workload) ret, usage = self._cleanup() - assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret) return usage def dump_db(self, path): diff --git a/tests/mgbench/workloads/base.py b/tests/mgbench/workloads/base.py index 12ebe4002..5264dcba9 100644 --- a/tests/mgbench/workloads/base.py +++ b/tests/mgbench/workloads/base.py @@ -9,8 +9,7 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -from abc import ABC, abstractclassmethod -from pathlib import Path +from abc import ABC import helpers from benchmark_context import BenchmarkContext @@ -18,6 +17,10 @@ from benchmark_context import BenchmarkContext # Base dataset class used as a template to create each individual dataset. All # common logic is handled here. +# This dataset is used also for disk storage. In that case, we have two dataset files, one for nodes +# and the second one for edges. Managing index file is handled in the same way. If the workload is used +# for disk storage, when calling the get_file() method, the exception will be raised. The user has to use +# get_node_file() or get_edge_file() method to get the correct file path. class Workload(ABC): # Name of the workload/dataset. NAME = "" @@ -28,9 +31,13 @@ class Workload(ABC): # List of local files that should be used to import the dataset. LOCAL_FILE = None + LOCAL_FILE_NODES = None + LOCAL_FILE_EDGES = None # URLs of remote dataset files that should be used to import the dataset, compressed in gz format. URL_FILE = None + URL_FILE_NODES = None + URL_FILE_EDGES = None # Index files LOCAL_INDEX_FILE = None @@ -49,9 +56,11 @@ class Workload(ABC): generator_prerequisite = "dataset_generator" in cls.__dict__ generator_indexes_prerequisite = "indexes_generator" in cls.__dict__ custom_import_prerequisite = "custom_import" in cls.__dict__ - basic_import_prerequisite = ("LOCAL_FILE" in cls.__dict__ or "URL_FILE" in cls.__dict__) and ( - "LOCAL_INDEX_FILE" in cls.__dict__ or "URL_INDEX_FILE" in cls.__dict__ - ) + basic_import_prerequisite = ( + "LOCAL_FILE" in cls.__dict__ + or "URL_FILE" in cls.__dict__ + or ("URL_FILE_NODES" in cls.__dict__ and "URL_FILE_EDGES" in cls.__dict__) + ) and ("LOCAL_INDEX_FILE" in cls.__dict__ or "URL_INDEX_FILE" in cls.__dict__) if not name_prerequisite: raise ValueError( @@ -87,7 +96,7 @@ class Workload(ABC): return super().__init_subclass__() - def __init__(self, variant: str = None, benchmark_context: BenchmarkContext = None): + def __init__(self, variant: str = None, benchmark_context: BenchmarkContext = None, disk_workload: bool = False): """ Accepts a `variant` variable that indicates which variant of the dataset should be executed @@ -96,63 +105,110 @@ class Workload(ABC): self._variant = variant self._vendor = benchmark_context.vendor_name self._file = None + self._node_file = None + self._edge_file = None self._file_index = None + self.disk_workload: bool = disk_workload + if self.NAME == "": raise ValueError("Give your workload a name, by setting self.NAME") - if variant is None: - variant = self.DEFAULT_VARIANT - if variant not in self.VARIANTS: - raise ValueError("Invalid test variant!") - if (self.LOCAL_FILE and variant not in self.LOCAL_FILE) and (self.URL_FILE and variant not in self.URL_FILE): - raise ValueError("The variant doesn't have a defined URL or LOCAL file path!") - if variant not in self.SIZES: - raise ValueError("The variant doesn't have a defined dataset " "size!") + self._validate_variant_argument() + self._validate_vendor_argument() - if (self.LOCAL_INDEX_FILE and self._vendor not in self.LOCAL_INDEX_FILE) and ( - self.URL_INDEX_FILE and self._vendor not in self.URL_INDEX_FILE - ): - raise ValueError("Vendor does not have INDEX for dataset!") + self._set_local_files() + self._set_url_files() - if self.LOCAL_FILE is not None: - self._local_file = self.LOCAL_FILE.get(variant, None) - else: - self._local_file = None - - if self.URL_FILE is not None: - self._url_file = self.URL_FILE.get(variant, None) - else: - self._url_file = None - - if self.LOCAL_INDEX_FILE is not None: - self._local_index = self.LOCAL_INDEX_FILE.get(self._vendor, None) - else: - self._local_index = None - - if self.URL_INDEX_FILE is not None: - self._url_index = self.URL_INDEX_FILE.get(self._vendor, None) - else: - self._url_index = None + self._set_local_index_file() + self._set_url_index_file() self._size = self.SIZES[variant] if "vertices" in self._size or "edges" in self._size: self._num_vertices = self._size["vertices"] self._num_edges = self._size["edges"] + def _validate_variant_argument(self) -> None: + if self._variant is None: + variant = self.DEFAULT_VARIANT + + if self._variant not in self.VARIANTS: + raise ValueError("Invalid test variant!") + + if self._variant not in self.SIZES: + raise ValueError("The variant doesn't have a defined dataset " "size!") + + if not self.disk_workload: + if (self.LOCAL_FILE and self._variant not in self.LOCAL_FILE) and ( + self.URL_FILE and self._variant not in self.URL_FILE + ): + raise ValueError("The variant doesn't have a defined URL or LOCAL file path!") + else: + if (self.LOCAL_FILE_NODES and self._variant not in self.LOCAL_FILE_NODES) and ( + self.URL_FILE_NODES and self._variant not in self.URL_FILE_NODES + ): + raise ValueError("The variant doesn't have a defined URL or LOCAL file path for nodes!") + if (self.LOCAL_FILE_EDGES and self._variant not in self.LOCAL_FILE_EDGES) and ( + self.URL_FILE_EDGES and self._variant not in self.URL_FILE_EDGES + ): + raise ValueError("The variant doesn't have a defined URL or LOCAL file path for edges!") + + def _validate_vendor_argument(self) -> None: + if (self.LOCAL_INDEX_FILE and self._vendor not in self.LOCAL_INDEX_FILE) and ( + self.URL_INDEX_FILE and self._vendor not in self.URL_INDEX_FILE + ): + raise ValueError("Vendor does not have INDEX for dataset!") + + def _set_local_files(self) -> None: + if not self.disk_workload: + if self.LOCAL_FILE is not None: + self._local_file = self.LOCAL_FILE.get(self._variant, None) + else: + self._local_file = None + else: + if self.LOCAL_FILE_NODES is not None: + self._local_file_nodes = self.LOCAL_FILE_NODES.get(self._variant, None) + else: + self._local_file_nodes = None + + if self.LOCAL_FILE_EDGES is not None: + self._local_file_edges = self.LOCAL_FILE_EDGES.get(self._variant, None) + else: + self._local_file_edges = None + + def _set_url_files(self) -> None: + if not self.disk_workload: + if self.URL_FILE is not None: + self._url_file = self.URL_FILE.get(self._variant, None) + else: + self._url_file = None + else: + if self.URL_FILE_NODES is not None: + self._url_file_nodes = self.URL_FILE_NODES.get(self._variant, None) + else: + self._url_file_nodes = None + if self.URL_FILE_EDGES is not None: + self._url_file_edges = self.URL_FILE_EDGES.get(self._variant, None) + else: + self._url_file_edges = None + + def _set_local_index_file(self) -> None: + if self.LOCAL_INDEX_FILE is not None: + self._local_index = self.LOCAL_INDEX_FILE.get(self._vendor, None) + else: + self._local_index = None + + def _set_url_index_file(self) -> None: + if self.URL_INDEX_FILE is not None: + self._url_index = self.URL_INDEX_FILE.get(self._vendor, None) + else: + self._url_index = None + def prepare(self, directory): - if self._local_file is not None: - print("Using local dataset file:", self._local_file) - self._file = self._local_file - elif self._url_file is not None: - cached_input, exists = directory.get_file("dataset.cypher") - if not exists: - print("Downloading dataset file:", self._url_file) - downloaded_file = helpers.download_file(self._url_file, directory.get_path()) - print("Unpacking and caching file:", downloaded_file) - helpers.unpack_gz_and_move_file(downloaded_file, cached_input) - print("Using cached dataset file:", cached_input) - self._file = cached_input + if not self.disk_workload: + self._prepare_dataset_for_in_memory_workload(directory) + else: + self._prepare_dataset_for_on_disk_workload(directory) if self._local_index is not None: print("Using local index file:", self._local_index) @@ -167,6 +223,55 @@ class Workload(ABC): print("Using cached index file:", cached_index) self._file_index = cached_index + def _prepare_dataset_for_on_disk_workload(self, directory): + self._prepare_nodes_for_on_disk_workload(directory) + self._prepare_edges_for_on_disk_workload(directory) + + def _prepare_nodes_for_on_disk_workload(self, directory): + if self._local_file_nodes is not None: + print("Using local dataset file for nodes:", self._local_file_nodes) + self._node_file = self._local_file_nodes + elif self._url_file_nodes is not None: + cached_input, exists = directory.get_file("dataset_nodes.cypher") + if not exists: + print("Downloading dataset file for nodes:", self._url_file_nodes) + downloaded_file = helpers.download_file(self._url_file_nodes, directory.get_path()) + print("Unpacking and caching file for nodes:", downloaded_file) + helpers.unpack_gz_and_move_file(downloaded_file, cached_input) + print("Using cached dataset file for nodes:", cached_input) + self._node_file = cached_input + + def _prepare_edges_for_on_disk_workload(self, directory): + if self._local_file_edges is not None: + print("Using local dataset file for edges:", self._local_file_edges) + self._edge_file = self._local_file_edges + elif self._url_file_edges is not None: + cached_input, exists = directory.get_file("dataset_edges.cypher") + if not exists: + print("Downloading dataset file for edges:", self._url_file_edges) + downloaded_file = helpers.download_file(self._url_file_edges, directory.get_path()) + print("Unpacking and caching file for edges:", downloaded_file) + helpers.unpack_gz_and_move_file(downloaded_file, cached_input) + print("Using cached dataset file for edges:", cached_input) + self._edge_file = cached_input + + def _prepare_dataset_for_in_memory_workload(self, directory): + if self._local_file is not None: + print("Using local dataset file:", self._local_file) + self._file = self._local_file + elif self._url_file is not None: + cached_input, exists = directory.get_file("dataset.cypher") + if not exists: + print("Downloading dataset file:", self._url_file) + downloaded_file = helpers.download_file(self._url_file, directory.get_path()) + print("Unpacking and caching file:", downloaded_file) + helpers.unpack_gz_and_move_file(downloaded_file, cached_input) + print("Using cached dataset file:", cached_input) + self._file = cached_input + + def is_disk_workload(self): + return self.disk_workload + def get_variant(self): """Returns the current variant of the dataset.""" return self._variant @@ -179,8 +284,25 @@ class Workload(ABC): """ Returns path to the file that contains dataset creation queries. """ + if self.disk_workload: + raise Exception("get_file method should not be called for disk storage") + return self._file + def get_node_file(self): + """Returns path to the file that contains dataset creation queries for nodes.""" + if not self.disk_workload: + raise Exception("get_node_file method should be called only for disk storage") + + return self._node_file + + def get_edge_file(self): + """Returns path to the file that contains dataset creation queries for edges.""" + if not self.disk_workload: + raise Exception("get_edge_file method should be called only for disk storage") + + return self._edge_file + def get_size(self): """Returns number of vertices/edges for the current variant.""" return self._size diff --git a/tests/mgbench/workloads/disk_pokec.py b/tests/mgbench/workloads/disk_pokec.py new file mode 100644 index 000000000..f19110a0c --- /dev/null +++ b/tests/mgbench/workloads/disk_pokec.py @@ -0,0 +1,449 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import random + +from benchmark_context import BenchmarkContext +from workloads.base import Workload +from workloads.importers.disk_importer_pokec import ImporterPokec + + +class Pokec(Workload): + NAME = "pokec_disk" + VARIANTS = ["small", "medium", "large"] + DEFAULT_VARIANT = "small" + FILE = None + + URL_FILE_NODES = { + "small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_small_import_nodes.cypher", + "medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_medium_import_nodes.cypher", + "large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_large_nodes.setup.cypher.gz", + } + + URL_FILE_EDGES = { + "small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_small_import_edges.cypher", + "medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_medium_import_edges.cypher", + "large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_large_edges.setup.cypher.gz", + } + + SIZES = { + "small": {"vertices": 10000, "edges": 121716}, + "medium": {"vertices": 100000, "edges": 1768515}, + "large": {"vertices": 1632803, "edges": 30622564}, + } + + URL_INDEX_FILE = { + "memgraph": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/memgraph.cypher", + "neo4j": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/neo4j.cypher", + } + + PROPERTIES_ON_EDGES = False + + def __init__(self, variant: str = None, benchmark_context: BenchmarkContext = None): + super().__init__(variant, benchmark_context=benchmark_context, disk_workload=True) + + def custom_import(self) -> bool: + importer = ImporterPokec( + benchmark_context=self.benchmark_context, + dataset_name=self.NAME, + index_file=self._file_index, + dataset_nodes_file=self._node_file, + dataset_edges_file=self._edge_file, + variant=self._variant, + ) + return importer.execute_import() + + # Helpers used to generate the queries + def _get_random_vertex(self): + # All vertices in the Pokec dataset have an ID in the range + # [1, _num_vertices]. + return random.randint(1, self._num_vertices) + + def _get_random_from_to(self): + vertex_from = self._get_random_vertex() + vertex_to = vertex_from + while vertex_to == vertex_from: + vertex_to = self._get_random_vertex() + return (vertex_from, vertex_to) + + # Arango benchmarks + + # OK + def benchmark__arango__single_vertex_read(self): + return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()}) + + # OK + def benchmark__arango__single_vertex_write(self): + return ( + "CREATE (n:UserTemp {id : $id}) RETURN n", + {"id": random.randint(1, self._num_vertices * 10)}, + ) + + # OK + def benchmark__arango__single_edge_write(self): + vertex_from, vertex_to = self._get_random_from_to() + return ( + "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "CREATE (n)-[e:Temp]->(m) RETURN e", + {"from": vertex_from, "to": vertex_to}, + ) + + # OK + def benchmark__arango__aggregate(self): + return ("MATCH (n:User) RETURN n.age, COUNT(*)", {}) + + # OK + def benchmark__arango__aggregate_with_distinct(self): + return ("MATCH (n:User) RETURN COUNT(DISTINCT n.age)", {}) + + # OK + def benchmark__arango__aggregate_with_filter(self): + return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {}) + + # NOT OK + # def benchmark__arango__expansion_1(self): + # return ( + # "MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__expansion_1_with_filter(self): + # return ( + # "MATCH (s:User {id: $id})-->(n:User) " "WHERE n.age >= 18 " "RETURN n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__expansion_2(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__expansion_2_with_filter(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__expansion_3(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__expansion_3_with_filter(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__expansion_4(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__expansion_4_with_filter(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__neighbours_2(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__neighbours_2_with_filter(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__neighbours_2_with_data(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id, n", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__neighbours_2_with_data_and_filter(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id, n", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__arango__shortest_path(self): + # vertex_from, vertex_to = self._get_random_from_to() + # return ( + # "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + # "MATCH p=(n)-[*bfs..15]->(m) " + # "RETURN extract(n in nodes(p) | n.id) AS path", + # {"from": vertex_from, "to": vertex_to}, + # ) + + # NOT OK + # def benchmark__arango__shortest_path_with_filter(self): + # vertex_from, vertex_to = self._get_random_from_to() + # return ( + # "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + # "MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) " + # "RETURN extract(n in nodes(p) | n.id) AS path", + # {"from": vertex_from, "to": vertex_to}, + # ) + + # OK + def benchmark__arango__allshortest_paths(self): + vertex_from, vertex_to = self._get_random_from_to() + return ( + "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "MATCH p=(n)-[*allshortest 2 (r, n | 1) total_weight]->(m) " + "RETURN extract(n in nodes(p) | n.id) AS path", + {"from": vertex_from, "to": vertex_to}, + ) + + # Our benchmark queries + + # OK + def benchmark__create__edge(self): + vertex_from, vertex_to = self._get_random_from_to() + return ( + "MATCH (a:User {id: $from}), (b:User {id: $to}) " "CREATE (a)-[:TempEdge]->(b)", + {"from": vertex_from, "to": vertex_to}, + ) + + # OK + def benchmark__create__pattern(self): + return ("CREATE ()-[:TempEdge]->()", {}) + + # OK + def benchmark__create__vertex(self): + return ("CREATE ()", {}) + + # OK + def benchmark__create__vertex_big(self): + return ( + "CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, " + 'p3: "Here is some text that is not extremely short", ' + 'p4:"Short text", p5: 234.434, p6: 11.11, p7: false})', + {}, + ) + + # OK + def benchmark__aggregation__count(self): + return ("MATCH (n) RETURN count(n), count(n.age)", {}) + + # OK + def benchmark__aggregation__min_max_avg(self): + return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {}) + + # NOT OK + # def benchmark__match__pattern_cycle(self): + # return ( + # "MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__match__pattern_long(self): + # return ( + # "MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" "(n3)-[e3]->(n4)<-[e4]-(n5) " "RETURN n5 LIMIT 1", + # {"id": self._get_random_vertex()}, + # ) + + # OK + def benchmark__match__pattern_short(self): + return ( + "MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1", + {"id": self._get_random_vertex()}, + ) + + # OK + def benchmark__match__vertex_on_label_property(self): + return ( + "MATCH (n:User) WITH n WHERE n.id = $id RETURN n", + {"id": self._get_random_vertex()}, + ) + + # OK + def benchmark__match__vertex_on_label_property_index(self): + return ("MATCH (n:User {id: $id}) RETURN n", {"id": self._get_random_vertex()}) + + # OK + def benchmark__match__vertex_on_property(self): + return ("MATCH (n:User {id: $id}) RETURN n", {"id": self._get_random_vertex()}) + + # OK + def benchmark__update__vertex_on_property(self): + return ( + "MATCH (n {id: $id}) SET n.property = -1", + {"id": self._get_random_vertex()}, + ) + + # Basic benchmark queries + + # OK + def benchmark__basic__aggregate_aggregate(self): + return ("MATCH (n:User) RETURN n.age, COUNT(*)", {}) + + # OK + def benchmark__basic__aggregate_count_aggregate(self): + return ("MATCH (n) RETURN count(n), count(n.age)", {}) + + # OK + def benchmark__basic__aggregate_with_filter_aggregate(self): + return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {}) + + # OK + def benchmark__basic__min_max_avg_aggregate(self): + return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {}) + + # NOT OK + # def benchmark__basic__expansion_1_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__expansion_1_with_filter_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->(n:User) " "WHERE n.age >= 18 " "RETURN n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__expansion_2_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__expansion_2_with_filter_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__expansion_3_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__expansion_3_with_filter_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__expansion_4_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__expansion_4_with_filter_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__neighbours_2_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__neighbours_2_with_filter_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__neighbours_2_with_data_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id, n", + # {"id": self._get_random_vertex()}, + # ) + + # NOT OK + # def benchmark__basic__neighbours_2_with_data_and_filter_analytical(self): + # return ( + # "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id, n", + # {"id": self._get_random_vertex()}, + # ) + + # OK + def benchmark__basic__pattern_cycle_analytical(self): + return ( + "MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2", + {"id": self._get_random_vertex()}, + ) + + # OK + def benchmark__basic__pattern_long_analytical(self): + return ( + "MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" "(n3)-[e3]->(n4)<-[e4]-(n5) " "RETURN n5 LIMIT 1", + {"id": self._get_random_vertex()}, + ) + + # OK + def benchmark__basic__pattern_short_analytical(self): + return ( + "MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1", + {"id": self._get_random_vertex()}, + ) + + # OK + def benchmark__basic__single_vertex_property_update_update(self): + return ( + "MATCH (n:User {id: $id}) SET n.property = -1", + {"id": self._get_random_vertex()}, + ) + + # OK + def benchmark__basic__single_vertex_read_read(self): + return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()}) + + # OK + def benchmark__basic__single_vertex_write_write(self): + return ( + "CREATE (n:UserTemp {id : $id}) RETURN n", + {"id": random.randint(1, self._num_vertices * 10)}, + ) + + # OK + def benchmark__basic__single_edge_write_write(self): + vertex_from, vertex_to = self._get_random_from_to() + return ( + "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "CREATE (n)-[e:Temp]->(m) RETURN e", + {"from": vertex_from, "to": vertex_to}, + ) diff --git a/tests/mgbench/workloads/importers/disk_importer_pokec.py b/tests/mgbench/workloads/importers/disk_importer_pokec.py new file mode 100644 index 000000000..560d7da9e --- /dev/null +++ b/tests/mgbench/workloads/importers/disk_importer_pokec.py @@ -0,0 +1,62 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +from pathlib import Path + +import log +from benchmark_context import BenchmarkContext +from constants import * +from runners import BaseRunner + + +class ImporterPokec: + def __init__( + self, + benchmark_context: BenchmarkContext, + dataset_name: str, + variant: str, + index_file: str, + dataset_nodes_file: str, + dataset_edges_file: str, + ) -> None: + self._benchmark_context = benchmark_context + self._dataset_name = dataset_name + self._variant = variant + self._index_file = index_file + self._dataset_nodes_file = dataset_nodes_file + self._dataset_edges_file = dataset_edges_file + + def execute_import(self): + if self._benchmark_context.vendor_name == "neo4j": + neo4j_dump = Path() / ".cache" / "datasets" / self._dataset_name / self._variant / "neo4j.dump" + vendor_runner = BaseRunner.create( + benchmark_context=self._benchmark_context, + ) + vendor_runner.clean_db() + if neo4j_dump.exists(): + log.log("Loading database from existing dump...") + vendor_runner.load_db_from_dump(path=neo4j_dump.parent) + else: + client = vendor_runner.fetch_client() + vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT) + log.log("Executing database index setup") + client.execute(file_path=self._index_file, num_workers=1) + log.log("Started importing dataset") + client.execute( + file_path=self._dataset_nodes_file, num_workers=self._benchmark_context.num_workers_for_import + ) + client.execute( + file_path=self._dataset_edges_file, num_workers=self._benchmark_context.num_workers_for_import + ) + vendor_runner.stop_db_init(VENDOR_RUNNER_IMPORT) + return True + else: + return False diff --git a/tools/bench-graph-client/main.py b/tools/bench-graph-client/main.py index edb09ecde..6456ae10c 100755 --- a/tools/bench-graph-client/main.py +++ b/tools/bench-graph-client/main.py @@ -20,9 +20,7 @@ GITHUB_REPOSITORY = os.getenv("GITHUB_REPOSITORY", "") GITHUB_SHA = os.getenv("GITHUB_SHA", "") GITHUB_REF = os.getenv("GITHUB_REF", "") -BENCH_GRAPH_SERVER_ENDPOINT = os.getenv( - "BENCH_GRAPH_SERVER_ENDPOINT", - "http://bench-graph-api:9001") +BENCH_GRAPH_SERVER_ENDPOINT = os.getenv("BENCH_GRAPH_SERVER_ENDPOINT", "http://bench-graph-api:9001") log = logging.getLogger(__name__) @@ -30,7 +28,9 @@ log = logging.getLogger(__name__) def parse_args(): argp = ArgumentParser(description=__doc__) argp.add_argument("--benchmark-name", type=str, required=True) - argp.add_argument("--benchmark-results-path", type=str, required=True) + argp.add_argument("--benchmark-results", type=str, required=True) + argp.add_argument("--benchmark-results-in-memory-analytical-path", type=str, required=False) + argp.add_argument("--benchmark-results-on-disk-txn-path", type=str, required=False) argp.add_argument("--github-run-id", type=int, required=True) argp.add_argument("--github-run-number", type=int, required=True) argp.add_argument("--head-branch-name", type=str, required=True) @@ -38,26 +38,45 @@ def parse_args(): def post_measurement(args): - with open(args.benchmark_results_path, "r") as f: - data = json.load(f) - timestamp = datetime.now().timestamp() - req = requests.post( - f"{BENCH_GRAPH_SERVER_ENDPOINT}/measurements", - json={ - "name": args.benchmark_name, - "timestamp": timestamp, - "git_repo": GITHUB_REPOSITORY, - "git_ref": GITHUB_REF, - "git_sha": GITHUB_SHA, - "github_run_id": args.github_run_id, - "github_run_number": args.github_run_number, - "results": data, - "git_branch": args.head_branch_name}, - timeout=1) - assert req.status_code == 200, \ - f"Uploading {args.benchmark_name} data failed." - log.info(f"{args.benchmark_name} data sent to " - f"{BENCH_GRAPH_SERVER_ENDPOINT}") + timestamp = datetime.now().timestamp() + with open(args.benchmark_results, "r") as in_memory_txn_file: + in_memory_txn_data = json.load(in_memory_txn_file) + + in_memory_analytical_data = None + if args.benchmark_results_in_memory_analytical_path is not None: + try: + with open(args.benchmark_results_in_memory_analytical_path, "r") as in_memory_analytical_file: + in_memory_analytical_data = json.load(in_memory_analytical_file) + except IOError: + log.error(f"Failed to load {args.benchmark_results_in_memory_analytical_path}.") + + on_disk_txn_data = None + if args.benchmark_results_on_disk_txn_path is not None: + try: + with open(args.benchmark_results_on_disk_txn_path, "r") as on_disk_txn_file: + on_disk_txn_data = json.load(on_disk_txn_file) + except IOError: + log.error(f"Failed to load {args.benchmark_results_on_disk_txn_path}.") + + req = requests.post( + f"{BENCH_GRAPH_SERVER_ENDPOINT}/measurements", + json={ + "name": args.benchmark_name, + "timestamp": timestamp, + "git_repo": GITHUB_REPOSITORY, + "git_ref": GITHUB_REF, + "git_sha": GITHUB_SHA, + "github_run_id": args.github_run_id, + "github_run_number": args.github_run_number, + "results": in_memory_txn_data, + "in_memory_analytical_results": in_memory_analytical_data, + "on_disk_txn_results": on_disk_txn_data, + "git_branch": args.head_branch_name, + }, + timeout=1, + ) + assert req.status_code == 200, f"Uploading {args.benchmark_name} data failed." + log.info(f"{args.benchmark_name} data sent to " f"{BENCH_GRAPH_SERVER_ENDPOINT}") if __name__ == "__main__":