Add mgbench support for disk storage and analytical mode (#1286)

* Add mgbench support for disk storage and analytical mode
This commit is contained in:
Andi 2023-10-06 10:19:29 +02:00 committed by GitHub
parent 3cc2bc2791
commit 2fd34489af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1185 additions and 405 deletions

View File

@ -417,11 +417,12 @@ jobs:
source ve3/bin/activate
pip install -r requirements.txt
./main.py --benchmark-name "macro_benchmark" \
--benchmark-results-path "../../tests/macro_benchmark/.harness_summary" \
--benchmark-results "../../tests/macro_benchmark/.harness_summary" \
--github-run-id "${{ github.run_id }}" \
--github-run-number "${{ github.run_number }}" \
--head-branch-name "${{ env.BRANCH_NAME }}"
# TODO (andi) No need for path flags and for --disk-storage and --in-memory-analytical
- name: Run mgbench
run: |
cd tests/mgbench
@ -434,7 +435,7 @@ jobs:
source ve3/bin/activate
pip install -r requirements.txt
./main.py --benchmark-name "mgbench" \
--benchmark-results-path "../../tests/mgbench/benchmark_result.json" \
--benchmark-results "../../tests/mgbench/benchmark_result.json" \
--github-run-id "${{ github.run_id }}" \
--github-run-number "${{ github.run_number }}" \
--head-branch-name "${{ env.BRANCH_NAME }}"

View File

@ -342,6 +342,12 @@ Benchgraph is currently a passive benchmark since resource usage and saturation
Latest version: https://memgraph.com/benchgraph
### Release v3 (latest) - 2023-02-10
- Improvements have been made for the sake of Memgraph internal performance testing.
- https://github.com/memgraph/memgraph/pull/1286
- https://github.com/memgraph/memgraph/pull/1280
### Release v2 (latest) - 2023-25-04
- Benchmark process changes:

View File

@ -19,6 +19,7 @@ import platform
import random
import sys
import time
from copy import deepcopy
from typing import Dict, List
import helpers
@ -26,54 +27,11 @@ import log
import runners
import setup
from benchmark_context import BenchmarkContext
from benchmark_results import BenchmarkResults
from constants import *
from workload_mode import BENCHMARK_MODE_MIXED, BENCHMARK_MODE_REALISTIC
from workloads import *
WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization"
WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization"
RUN_CONFIGURATION = "__run_configuration__"
IMPORT = "__import__"
THROUGHPUT = "throughput"
DATABASE = "database"
MEMORY = "memory"
VENDOR = "vendor"
CONDITION = "condition"
NUM_WORKERS_FOR_BENCHMARK = "num_workers_for_benchmark"
SINGLE_THREADED_RUNTIME_SEC = "single_threaded_runtime_sec"
BENCHMARK_MODE = "benchmark_mode"
BENCHMARK_MODE_CONFIG = "benchmark_mode_config"
PLATFORM = "platform"
COUNT = "count"
DURATION = "duration"
NUM_WORKERS = "num_workers"
CPU = "cpu"
MEMORY = "memory"
VENDOR_RUNNER_IMPORT = "import"
VENDOR_RUNNER_AUTHORIZATION = "authorization"
CLIENT = "client"
DATABASE = "database"
CUSTOM_LOAD = "custom_load"
RETRIES = "retries"
DOCKER = "docker"
METADATA = "metadata"
LATENCY_STATS = "latency_stats"
ITERATIONS = "iterations"
USERNAME = "user"
PASSWORD = "test"
DATABASE_CONDITION_HOT = "hot"
DATABASE_CONDITION_VULCANIC = "vulcanic"
WRITE_TYPE_QUERY = "write"
READ_TYPE_QUERY = "read"
UPDATE_TYPE_QUERY = "update"
ANALYTICAL_TYPE_QUERY = "analytical"
QUERY = "query"
CACHE = "cache"
DISK_PREPARATION_RSS = "disk_storage_preparation"
IN_MEMORY_ANALYTICAL_RSS = "in_memory_analytical_preparation"
IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL"
WARMUP_TO_HOT_QUERIES = [
("CREATE ();", {}),
("CREATE ()-[:TempEdge]->();", {}),
@ -161,7 +119,19 @@ def parse_args():
benchmark_parser.add_argument(
"--export-results",
default=None,
help="file path into which results should be exported",
help="file path into which results for in_memory_transactional storage mode should be exported",
)
benchmark_parser.add_argument(
"--export-results-in-memory-analytical",
default=None,
help="File path into which results for in_memory_analytical storage mode should be exported. If set, benchmarks for analytical mode will be run.",
)
benchmark_parser.add_argument(
"--export-results-on-disk-txn",
default=None,
help="File path into which results for on_disk_transactional storage mode should be exported. If set, benchmarks for disk storage will be run.",
)
benchmark_parser.add_argument(
@ -220,20 +190,6 @@ def parse_args():
benchmark_parser.add_argument("--customer-workloads", default=None, help="Path to customers workloads")
benchmark_parser.add_argument(
"--disk-storage",
action="store_true",
default=False,
help="If the flag set, benchmarks will be run also for disk storage.",
)
benchmark_parser.add_argument(
"--in-memory-analytical",
action="store_true",
default=False,
help="If the flag set, benchmarks will be run also for in_memory_analytical.",
)
benchmark_parser.add_argument(
"--vendor-specific",
nargs="*",
@ -279,6 +235,17 @@ def parse_args():
return parser.parse_args()
def sanitize_args(args):
assert args.benchmarks != None, helpers.list_available_workloads()
assert args.num_workers_for_import > 0
assert args.num_workers_for_benchmark > 0
assert args.export_results != None, "Pass where will results be saved"
assert args.single_threaded_runtime_sec >= 1, "Low runtime value, consider extending time for more accurate results"
assert (
args.workload_realistic == None or args.workload_mixed == None
), "Cannot run both realistic and mixed workload, only one mode run at the time"
def get_queries(gen, count):
random.seed(gen.__name__)
ret = []
@ -287,21 +254,6 @@ def get_queries(gen, count):
return ret
def warmup(condition: str, client: runners.BaseRunner, queries: list = None):
if condition == DATABASE_CONDITION_HOT:
log.log("Execute warm-up to match condition: {} ".format(condition))
client.execute(
queries=WARMUP_TO_HOT_QUERIES,
num_workers=1,
)
elif condition == DATABASE_CONDITION_VULCANIC:
log.log("Execute warm-up to match condition: {} ".format(condition))
client.execute(queries=queries)
else:
log.log("No warm-up on condition: {} ".format(condition))
log.log("Finished warm-up procedure to match database condition: {} ".format(condition))
def validate_workload_distribution(percentage_distribution, queries_by_type):
percentages_by_type = {
WRITE_TYPE_QUERY: percentage_distribution[0],
@ -360,7 +312,13 @@ def prepare_for_workload(benchmark_context, dataset, group, queries):
def realistic_workload(
vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext
vendor: runners.BaseRunner,
client: runners.BaseClient,
dataset,
group,
queries,
benchmark_context: BenchmarkContext,
results,
):
log.log("Executing realistic workload...")
config_distribution, queries_by_type, _, percentage_distribution, num_of_queries = prepare_for_workload(
@ -407,7 +365,13 @@ def realistic_workload(
def mixed_workload(
vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext
vendor: runners.BaseRunner,
client: runners.BaseClient,
dataset,
group,
queries,
benchmark_context: BenchmarkContext,
results,
):
log.log("Executing mixed workload...")
(
@ -467,6 +431,21 @@ def mixed_workload(
results.set_value(*results_key, value=ret)
def warmup(condition: str, client: runners.BaseRunner, queries: list = None):
if condition == DATABASE_CONDITION_HOT:
log.log("Execute warm-up to match condition: {} ".format(condition))
client.execute(
queries=WARMUP_TO_HOT_QUERIES,
num_workers=1,
)
elif condition == DATABASE_CONDITION_VULCANIC:
log.log("Execute warm-up to match condition: {} ".format(condition))
client.execute(queries=queries)
else:
log.log("No warm-up on condition: {} ".format(condition))
log.log("Finished warm-up procedure to match database condition: {} ".format(condition))
def get_query_cache_count(
vendor: runners.BaseRunner,
client: runners.BaseClient,
@ -477,16 +456,12 @@ def get_query_cache_count(
query: str,
func: str,
):
log.init("Determining query count for benchmark based on --single-threaded-runtime argument")
log.init(
f"Determining query count for benchmark based on --single-threaded-runtime argument = {benchmark_context.single_threaded_runtime_sec}s"
)
config_key = [workload.NAME, workload.get_variant(), group, query]
cached_count = config.get_value(*config_key)
if cached_count is None:
log.info(
"Determining the number of queries necessary for {} seconds of single-threaded runtime...".format(
benchmark_context.single_threaded_runtime_sec
)
)
log.log("Running query to prime the query cache...")
vendor.start_db(CACHE)
client.execute(queries=queries, num_workers=1)
count = 1
@ -495,18 +470,16 @@ def get_query_cache_count(
duration = ret[0][DURATION]
should_execute = int(benchmark_context.single_threaded_runtime_sec / (duration / count))
log.log(
"executed_queries={}, total_duration={}, query_duration={}, estimated_count={}".format(
"Executed_queries={}, total_duration={}, query_duration={}, estimated_count={}".format(
count, duration, duration / count, should_execute
)
)
# We don't have to execute the next iteration when
# `should_execute` becomes the same order of magnitude as
# `count * 10`.
if should_execute / (count * 10) < 10:
count = should_execute
break
else:
count = count * 10
vendor.stop_db(CACHE)
if count < benchmark_context.query_count_lower_bound:
@ -529,38 +502,6 @@ def get_query_cache_count(
return count
def log_benchmark_summary(results: Dict):
log.init("~" * 45)
log.info("Benchmark finished.")
log.init("~" * 45)
log.log("\n")
log.summary("Benchmark summary")
log.log("-" * 90)
log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage"))
for dataset, variants in results.items():
if dataset == RUN_CONFIGURATION:
continue
for groups in variants.values():
for group, queries in groups.items():
if group == IMPORT:
continue
for query, auth in queries.items():
for value in auth.values():
log.log("-" * 90)
log.summary(
"{:<20} {:>26.2f} QPS {:>27.2f} MB".format(
query, value[THROUGHPUT], value[DATABASE][MEMORY] / (1024.0 * 1024.0)
)
)
log.log("-" * 90)
def log_benchmark_arguments(benchmark_context):
log.init("Executing benchmark with following arguments: ")
for key, value in benchmark_context.__dict__.items():
log.log("{:<30} : {:<30}".format(str(key), str(value)))
def check_benchmark_requirements(benchmark_context):
if setup.check_requirements(benchmark_context):
log.success("Requirements for starting benchmark satisfied!")
@ -577,19 +518,6 @@ def setup_cache_config(benchmark_context, cache):
return helpers.RecursiveDict()
def sanitize_args(args):
assert args.benchmarks != None, helpers.list_available_workloads()
assert args.num_workers_for_import > 0
assert args.num_workers_for_benchmark > 0
assert args.export_results != None, "Pass where will results be saved"
assert (
args.single_threaded_runtime_sec >= 10
), "Low runtime value, consider extending time for more accurate results"
assert (
args.workload_realistic == None or args.workload_mixed == None
), "Cannot run both realistic and mixed workload, only one mode run at the time"
def save_import_results(workload, results, import_results, rss_usage):
log.info("Summarized importing benchmark results:")
import_key = [workload.NAME, workload.get_variant(), IMPORT]
@ -597,21 +525,279 @@ def save_import_results(workload, results, import_results, rss_usage):
# Display import statistics.
for row in import_results:
log.success(
"Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format(
row[COUNT], row[DURATION], row[NUM_WORKERS], row[THROUGHPUT]
)
f"Executed {row[COUNT]} queries in {row[DURATION]} seconds using {row[NUM_WORKERS]} workers with a total throughput of {row[THROUGHPUT]} Q/S."
)
log.success(
"The database used {} seconds of CPU time and peaked at {} MiB of RAM".format(
rss_usage[CPU], rss_usage[MEMORY] / (1024 * 1024)
)
f"The database used {rss_usage[CPU]} seconds of CPU time and peaked at {rss_usage[MEMORY] / (1024 * 1024)} MiB of RAM"
)
results.set_value(*import_key, value={CLIENT: import_results, DATABASE: rss_usage})
else:
results.set_value(*import_key, value={CLIENT: CUSTOM_LOAD, DATABASE: CUSTOM_LOAD})
def save_to_results(results, ret, workload, group, query, authorization_mode):
results_key = [
workload.NAME,
workload.get_variant(),
group,
query,
authorization_mode,
]
results.set_value(*results_key, value=ret)
def run_isolated_workload_with_authorization(vendor_runner, client, queries, group, workload, results):
log.init("Running isolated workload with authorization")
log.info("Running preprocess AUTH queries")
vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION)
client.execute(queries=SETUP_AUTH_QUERIES)
client.set_credentials(username=USERNAME, password=PASSWORD)
vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION)
for query, funcname in queries[group]:
log.init("Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION))
func = getattr(workload, funcname)
count = get_query_cache_count(
vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func
)
vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION)
start_time = time.time()
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
ret = client.execute(
queries=get_queries(func, count),
num_workers=benchmark_context.num_workers_for_benchmark,
)[0]
usage = vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION)
time_elapsed = time.time() - start_time
log.info(f"Benchmark execution of query {funcname} finished in {time_elapsed} seconds.")
ret[DATABASE] = usage
log_metrics_summary(ret, usage)
log_metadata_summary(ret)
log.success("Throughput: {:02f} QPS".format(ret[THROUGHPUT]))
save_to_results(results, ret, workload, group, query, WITH_FINE_GRAINED_AUTHORIZATION)
vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION)
log.info("Running cleanup of auth queries")
ret = client.execute(queries=CLEANUP_AUTH_QUERIES)
vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION)
def run_isolated_workload_without_authorization(vendor_runner, client, queries, group, workload, results):
log.init("Running isolated workload without authorization")
for query, funcname in queries[group]:
log.init(
"Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION),
)
func = getattr(workload, funcname)
count = get_query_cache_count(
vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func
)
# Benchmark run.
sample_query = get_queries(func, 1)[0][0]
log.info("Sample query:{}".format(sample_query))
log.log(
"Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format(
count, benchmark_context.single_threaded_runtime_sec
)
)
log.log("Queries are executed using {} concurrent clients".format(benchmark_context.num_workers_for_benchmark))
start_time = time.time()
rss_db = workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query
vendor_runner.start_db(rss_db)
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
log.init("Executing benchmark queries...")
ret = client.execute(
queries=get_queries(func, count),
num_workers=benchmark_context.num_workers_for_benchmark,
time_dependent_execution=benchmark_context.time_dependent_execution,
)[0]
time_elapsed = time.time() - start_time
log.info(f"Benchmark execution of query {funcname} finished in {time_elapsed} seconds.")
usage = vendor_runner.stop_db(rss_db)
ret[DATABASE] = usage
log_output_summary(benchmark_context, ret, usage, funcname, sample_query)
save_to_results(results, ret, workload, group, query, WITHOUT_FINE_GRAINED_AUTHORIZATION)
def setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload, storage_mode):
vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT)
log.info("Executing database index setup")
start_time = time.time()
if generated_queries:
client.execute(queries=workload.indexes_generator(), num_workers=1)
log.info("Finished setting up indexes.")
log.info("Started importing dataset")
import_results = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import)
else:
log.info("Using workload information for importing dataset and creating indices")
log.info("Preparing workload: " + workload.NAME + "/" + workload.get_variant())
workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant()))
imported = workload.custom_import()
if not imported:
client.execute(file_path=workload.get_index(), num_workers=1)
log.info("Finished setting up indexes.")
log.info("Started importing dataset")
if storage_mode == ON_DISK_TRANSACTIONAL:
import_results = client.execute(file_path=workload.get_node_file(), num_workers=1)
import_results = client.execute(file_path=workload.get_edge_file(), num_workers=1)
else:
import_results = client.execute(
file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import
)
else:
log.info("Custom import executed")
log.info(f"Finished importing dataset in {time.time() - start_time}s")
rss_usage = vendor_runner.stop_db_init(VENDOR_RUNNER_IMPORT)
return import_results, rss_usage
def run_target_workload(benchmark_context, workload, bench_queries, vendor_runner, client, results, storage_mode):
generated_queries = workload.dataset_generator()
import_results, rss_usage = setup_indices_and_import_dataset(
client, vendor_runner, generated_queries, workload, storage_mode
)
save_import_results(workload, results, import_results, rss_usage)
for group in sorted(bench_queries.keys()):
log.init(f"\nRunning benchmark in {benchmark_context.mode} workload mode for {group} group")
if benchmark_context.mode == BENCHMARK_MODE_MIXED:
mixed_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context, results)
elif benchmark_context.mode == BENCHMARK_MODE_REALISTIC:
realistic_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context, results)
else:
run_isolated_workload_without_authorization(vendor_runner, client, bench_queries, group, workload, results)
if benchmark_context.no_authorization:
run_isolated_workload_with_authorization(vendor_runner, client, bench_queries, group, workload, results)
# TODO: (andi) Reorder functions in top-down notion in order to improve readibility
def run_target_workloads(benchmark_context, target_workloads, bench_results):
for workload, bench_queries in target_workloads:
log.info(f"Started running {str(workload.NAME)} workload")
benchmark_context.set_active_workload(workload.NAME)
benchmark_context.set_active_variant(workload.get_variant())
if workload.is_disk_workload() and benchmark_context.export_results_on_disk_txn:
run_on_disk_transactional_benchmark(benchmark_context, workload, bench_queries, bench_results.disk_results)
else:
run_in_memory_transactional_benchmark(
benchmark_context, workload, bench_queries, bench_results.in_memory_txn_results
)
if benchmark_context.export_results_in_memory_analytical:
run_in_memory_analytical_benchmark(
benchmark_context, workload, bench_queries, bench_results.in_memory_analytical_results
)
def run_on_disk_transactional_benchmark(benchmark_context, workload, bench_queries, disk_results):
log.info(f"Running benchmarks for {ON_DISK_TRANSACTIONAL} storage mode.")
disk_vendor_runner, disk_client = client_runner_factory(benchmark_context)
disk_vendor_runner.start_db(DISK_PREPARATION_RSS)
disk_client.execute(queries=SETUP_DISK_STORAGE)
disk_vendor_runner.stop_db(DISK_PREPARATION_RSS)
run_target_workload(
benchmark_context, workload, bench_queries, disk_vendor_runner, disk_client, disk_results, ON_DISK_TRANSACTIONAL
)
log.info(f"Finished running benchmarks for {ON_DISK_TRANSACTIONAL} storage mode.")
def run_in_memory_analytical_benchmark(benchmark_context, workload, bench_queries, in_memory_analytical_results):
log.info(f"Running benchmarks for {IN_MEMORY_ANALYTICAL} storage mode.")
in_memory_analytical_vendor_runner, in_memory_analytical_client = client_runner_factory(benchmark_context)
in_memory_analytical_vendor_runner.start_db(IN_MEMORY_ANALYTICAL_RSS)
in_memory_analytical_client.execute(queries=SETUP_IN_MEMORY_ANALYTICAL_STORAGE_MODE)
in_memory_analytical_vendor_runner.stop_db(IN_MEMORY_ANALYTICAL_RSS)
run_target_workload(
benchmark_context,
workload,
bench_queries,
in_memory_analytical_vendor_runner,
in_memory_analytical_client,
in_memory_analytical_results,
IN_MEMORY_ANALYTICAL,
)
log.info(f"Finished running benchmarks for {IN_MEMORY_ANALYTICAL} storage mode.")
def run_in_memory_transactional_benchmark(benchmark_context, workload, bench_queries, in_memory_txn_results):
log.info(f"Running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.")
in_memory_txn_vendor_runner, in_memory_txn_client = client_runner_factory(benchmark_context)
run_target_workload(
benchmark_context,
workload,
bench_queries,
in_memory_txn_vendor_runner,
in_memory_txn_client,
in_memory_txn_results,
IN_MEMORY_TRANSACTIONAL,
)
log.info(f"Finished running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.")
def client_runner_factory(benchmark_context):
vendor_runner = runners.BaseRunner.create(benchmark_context=benchmark_context)
vendor_runner.clean_db()
log.log("Database cleaned from any previous data")
client = vendor_runner.fetch_client()
return vendor_runner, client
def validate_target_workloads(benchmark_context, target_workloads):
if len(target_workloads) == 0:
log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload))
log.error("Please check the pattern and workload NAME property, query group and query name.")
log.info("Currently available workloads: ")
log.log(helpers.list_available_workloads(benchmark_context.customer_workloads))
sys.exit(1)
def log_benchmark_summary(results: Dict, storage_mode):
log.log("\n")
log.summary(f"Benchmark summary of {storage_mode} mode")
log.log("-" * 120)
log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage"))
for dataset, variants in results.items():
if dataset == RUN_CONFIGURATION:
continue
for groups in variants.values():
for group, queries in groups.items():
if group == IMPORT:
continue
for query, auth in queries.items():
for value in auth.values():
log.log("-" * 120)
log.summary(
"{:<20} {:>26.2f} QPS {:>27.2f} MB".format(
query, value[THROUGHPUT], value[DATABASE][MEMORY] / (1024.0 * 1024.0)
)
)
log.log("-" * 90)
def log_benchmark_arguments(benchmark_context):
log.init("Executing benchmark with following arguments: ")
for key, value in benchmark_context.__dict__.items():
log.log("{:<30} : {:<30}".format(str(key), str(value)))
def log_metrics_summary(ret, usage):
log.log("Executed {} queries in {} seconds.".format(ret[COUNT], ret[DURATION]))
log.log("Queries have been retried {} times".format(ret[RETRIES]))
@ -645,175 +831,6 @@ def log_output_summary(benchmark_context, ret, usage, funcname, sample_query):
log.success("Throughput: {:02f} QPS\n\n".format(ret[THROUGHPUT]))
def save_to_results(results, ret, workload, group, query, authorization_mode):
results_key = [
workload.NAME,
workload.get_variant(),
group,
query,
authorization_mode,
]
results.set_value(*results_key, value=ret)
def run_isolated_workload_with_authorization(vendor_runner, client, queries, group, workload):
log.init("Running isolated workload with authorization")
log.info("Running preprocess AUTH queries")
vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION)
client.execute(queries=SETUP_AUTH_QUERIES)
client.set_credentials(username=USERNAME, password=PASSWORD)
vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION)
for query, funcname in queries[group]:
log.init("Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION))
func = getattr(workload, funcname)
count = get_query_cache_count(
vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func
)
vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION)
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
ret = client.execute(
queries=get_queries(func, count),
num_workers=benchmark_context.num_workers_for_benchmark,
)[0]
usage = vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION)
ret[DATABASE] = usage
log_metrics_summary(ret, usage)
log_metadata_summary(ret)
log.success("Throughput: {:02f} QPS".format(ret[THROUGHPUT]))
save_to_results(results, ret, workload, group, query, WITH_FINE_GRAINED_AUTHORIZATION)
vendor_runner.start_db(VENDOR_RUNNER_AUTHORIZATION)
log.info("Running cleanup of auth queries")
ret = client.execute(queries=CLEANUP_AUTH_QUERIES)
vendor_runner.stop_db(VENDOR_RUNNER_AUTHORIZATION)
def run_isolated_workload_without_authorization(vendor_runner, client, queries, group, workload):
log.init("Running isolated workload without authorization")
for query, funcname in queries[group]:
log.init(
"Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION),
)
func = getattr(workload, funcname)
count = get_query_cache_count(
vendor_runner, client, get_queries(func, 1), benchmark_context, workload, group, query, func
)
# Benchmark run.
sample_query = get_queries(func, 1)[0][0]
log.info("Sample query:{}".format(sample_query))
log.log(
"Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format(
count, benchmark_context.single_threaded_runtime_sec
)
)
log.log("Queries are executed using {} concurrent clients".format(benchmark_context.num_workers_for_benchmark))
rss_db = workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query
vendor_runner.start_db(rss_db)
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
log.init("Executing benchmark queries...")
ret = client.execute(
queries=get_queries(func, count),
num_workers=benchmark_context.num_workers_for_benchmark,
time_dependent_execution=benchmark_context.time_dependent_execution,
)[0]
log.info("Benchmark execution finished...")
usage = vendor_runner.stop_db(rss_db)
ret[DATABASE] = usage
log_output_summary(benchmark_context, ret, usage, funcname, sample_query)
save_to_results(results, ret, workload, group, query, WITHOUT_FINE_GRAINED_AUTHORIZATION)
def setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload):
vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT)
log.info("Executing database index setup")
if generated_queries:
client.execute(queries=workload.indexes_generator(), num_workers=1)
log.info("Finished setting up indexes.")
log.info("Started importing dataset")
import_results = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import)
else:
log.info("Using workload information for importing dataset and creating indices")
log.info("Preparing workload: " + workload.NAME + "/" + workload.get_variant())
workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant()))
imported = workload.custom_import()
if not imported:
client.execute(file_path=workload.get_index(), num_workers=1)
log.info("Finished setting up indexes.")
log.info("Started importing dataset")
import_results = client.execute(
file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import
)
else:
log.info("Custom import executed")
log.info("Finished importing dataset")
rss_usage = vendor_runner.stop_db_init(VENDOR_RUNNER_IMPORT)
return import_results, rss_usage
def run_target_workload(benchmark_context, workload, bench_queries, vendor_runner, client):
generated_queries = workload.dataset_generator()
import_results, rss_usage = setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload)
save_import_results(workload, results, import_results, rss_usage)
for group in sorted(bench_queries.keys()):
log.init(f"\nRunning benchmark in {benchmark_context.mode} workload mode for {group} group")
if benchmark_context.mode == BENCHMARK_MODE_MIXED:
mixed_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context)
elif benchmark_context.mode == BENCHMARK_MODE_REALISTIC:
realistic_workload(vendor_runner, client, workload, group, bench_queries, benchmark_context)
else:
run_isolated_workload_without_authorization(vendor_runner, client, bench_queries, group, workload)
if benchmark_context.no_authorization:
run_isolated_workload_with_authorization(vendor_runner, client, bench_queries, group, workload)
# TODO: (andi) Reorder functions in top-down notion in order to improve readibility
def run_target_workloads(benchmark_context, target_workloads):
for workload, bench_queries in target_workloads:
log.info(f"Started running {str(workload.NAME)} workload")
benchmark_context.set_active_workload(workload.NAME)
benchmark_context.set_active_variant(workload.get_variant())
log.info(f"Running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.")
in_memory_txn_vendor_runner, in_memory_txn_client = client_runner_factory(benchmark_context)
run_target_workload(
benchmark_context, workload, bench_queries, in_memory_txn_vendor_runner, in_memory_txn_client
)
log.info(f"Finished running benchmarks for {IN_MEMORY_TRANSACTIONAL} storage mode.")
def client_runner_factory(benchmark_context):
vendor_runner = runners.BaseRunner.create(benchmark_context=benchmark_context)
vendor_runner.clean_db()
log.log("Database cleaned from any previous data")
client = vendor_runner.fetch_client()
return vendor_runner, client
def validate_target_workloads(benchmark_context, target_workloads):
if len(target_workloads) == 0:
log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload))
log.error("Please check the pattern and workload NAME property, query group and query name.")
log.info("Currently available workloads: ")
log.log(helpers.list_available_workloads(benchmark_context.customer_workloads))
sys.exit(1)
if __name__ == "__main__":
args = parse_args()
sanitize_args(args)
@ -833,6 +850,8 @@ if __name__ == "__main__":
query_count_lower_bound=args.query_count_lower_bound,
no_load_query_counts=args.no_load_query_counts,
export_results=args.export_results,
export_results_in_memory_analytical=args.export_results_in_memory_analytical,
export_results_on_disk_txn=args.export_results_on_disk_txn,
temporary_directory=temp_dir.absolute(),
workload_mixed=args.workload_mixed,
workload_realistic=args.workload_realistic,
@ -842,8 +861,6 @@ if __name__ == "__main__":
no_authorization=args.no_authorization,
customer_workloads=args.customer_workloads,
vendor_args=vendor_specific_args,
disk_storage=args.disk_storage,
in_memory_analytical=args.in_memory_analytical,
)
log_benchmark_arguments(benchmark_context)
@ -854,7 +871,7 @@ if __name__ == "__main__":
log.log("Cache folder in use: " + cache.get_default_cache_directory())
config = setup_cache_config(benchmark_context, cache)
run_config = {
in_memory_txn_run_config = {
VENDOR: benchmark_context.vendor_name,
CONDITION: benchmark_context.warm_up,
NUM_WORKERS_FOR_BENCHMARK: benchmark_context.num_workers_for_benchmark,
@ -862,10 +879,19 @@ if __name__ == "__main__":
BENCHMARK_MODE: benchmark_context.mode,
BENCHMARK_MODE_CONFIG: benchmark_context.mode_config,
PLATFORM: platform.platform(),
STORAGE_MODE: IN_MEMORY_TRANSACTIONAL,
}
results = helpers.RecursiveDict()
results.set_value(RUN_CONFIGURATION, value=run_config)
in_memory_analytical_run_config = deepcopy(in_memory_txn_run_config)
in_memory_analytical_run_config[STORAGE_MODE] = IN_MEMORY_ANALYTICAL
on_disk_transactional_run_config = deepcopy(in_memory_txn_run_config)
on_disk_transactional_run_config[STORAGE_MODE] = ON_DISK_TRANSACTIONAL
bench_results = BenchmarkResults()
bench_results.in_memory_txn_results.set_value(RUN_CONFIGURATION, value=in_memory_txn_run_config)
bench_results.in_memory_analytical_results.set_value(RUN_CONFIGURATION, value=in_memory_analytical_run_config)
bench_results.disk_results.set_value(RUN_CONFIGURATION, value=on_disk_transactional_run_config)
available_workloads = helpers.get_available_workloads(benchmark_context.customer_workloads)
@ -876,13 +902,22 @@ if __name__ == "__main__":
)
validate_target_workloads(benchmark_context, target_workloads)
run_target_workloads(benchmark_context, target_workloads)
run_target_workloads(benchmark_context, target_workloads, bench_results)
if not benchmark_context.no_save_query_counts:
cache.save_config(config)
log_benchmark_summary(results.get_data())
log_benchmark_summary(bench_results.in_memory_txn_results.get_data(), IN_MEMORY_TRANSACTIONAL)
if benchmark_context.export_results:
with open(benchmark_context.export_results, "w") as f:
json.dump(results.get_data(), f)
json.dump(bench_results.in_memory_txn_results.get_data(), f)
log_benchmark_summary(bench_results.in_memory_analytical_results.get_data(), IN_MEMORY_ANALYTICAL)
if benchmark_context.export_results_in_memory_analytical:
with open(benchmark_context.export_results_in_memory_analytical, "w") as f:
json.dump(bench_results.in_memory_analytical_results.get_data(), f)
log_benchmark_summary(bench_results.disk_results.get_data(), ON_DISK_TRANSACTIONAL)
if benchmark_context.export_results_on_disk_txn:
with open(benchmark_context.export_results_on_disk_txn, "w") as f:
json.dump(bench_results.disk_results.get_data(), f)

View File

@ -34,6 +34,8 @@ class BenchmarkContext:
no_load_query_counts: bool = False,
no_save_query_counts: bool = False,
export_results: str = None,
export_results_in_memory_analytical: str = None,
export_results_on_disk_txn: str = None,
temporary_directory: str = None,
workload_mixed: str = None, # Default mode is isolated, mixed None
workload_realistic: str = None, # Default mode is isolated, realistic None
@ -43,8 +45,6 @@ class BenchmarkContext:
no_authorization: bool = True,
customer_workloads: str = None,
vendor_args: dict = {},
disk_storage: bool = False,
in_memory_analytical: bool = False,
) -> None:
self.benchmark_target_workload = benchmark_target_workload
self.vendor_binary = vendor_binary
@ -57,6 +57,8 @@ class BenchmarkContext:
self.no_load_query_counts = no_load_query_counts
self.no_save_query_counts = no_save_query_counts
self.export_results = export_results
self.export_results_in_memory_analytical = export_results_in_memory_analytical
self.export_results_on_disk_txn = export_results_on_disk_txn
self.temporary_directory = temporary_directory
assert (
@ -81,8 +83,6 @@ class BenchmarkContext:
self.vendor_args = vendor_args
self.active_workload = None
self.active_variant = None
self.disk_storage = disk_storage
self.in_memory_analytical = in_memory_analytical
def set_active_workload(self, workload: str) -> None:
self.active_workload = workload

View File

@ -0,0 +1,19 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import helpers
class BenchmarkResults:
def __init__(self) -> None:
self.in_memory_txn_results = helpers.RecursiveDict()
self.in_memory_analytical_results = helpers.RecursiveDict()
self.disk_results = helpers.RecursiveDict()

View File

@ -0,0 +1,56 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization"
WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization"
RUN_CONFIGURATION = "__run_configuration__"
IMPORT = "__import__"
THROUGHPUT = "throughput"
DATABASE = "database"
MEMORY = "memory"
VENDOR = "vendor"
CONDITION = "condition"
NUM_WORKERS_FOR_BENCHMARK = "num_workers_for_benchmark"
SINGLE_THREADED_RUNTIME_SEC = "single_threaded_runtime_sec"
BENCHMARK_MODE = "benchmark_mode"
BENCHMARK_MODE_CONFIG = "benchmark_mode_config"
PLATFORM = "platform"
COUNT = "count"
DURATION = "duration"
NUM_WORKERS = "num_workers"
CPU = "cpu"
MEMORY = "memory"
VENDOR_RUNNER_IMPORT = "import"
VENDOR_RUNNER_AUTHORIZATION = "authorization"
CLIENT = "client"
DATABASE = "database"
CUSTOM_LOAD = "custom_load"
RETRIES = "retries"
DOCKER = "docker"
METADATA = "metadata"
LATENCY_STATS = "latency_stats"
ITERATIONS = "iterations"
USERNAME = "user"
PASSWORD = "test"
DATABASE_CONDITION_HOT = "hot"
DATABASE_CONDITION_VULCANIC = "vulcanic"
WRITE_TYPE_QUERY = "write"
READ_TYPE_QUERY = "read"
UPDATE_TYPE_QUERY = "update"
ANALYTICAL_TYPE_QUERY = "analytical"
QUERY = "query"
CACHE = "cache"
DISK_PREPARATION_RSS = "disk_storage_preparation"
IN_MEMORY_ANALYTICAL_RSS = "in_memory_analytical_preparation"
STORAGE_MODE = "storage_mode"
IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL"
IN_MEMORY_ANALYTICAL = "IN_MEMORY_ANALYTICAL"
ON_DISK_TRANSACTIONAL = "ON_DISK_TRANSACTIONAL"

View File

@ -20,10 +20,10 @@ import subprocess
import sys
from pathlib import Path
import log
import workloads
from benchmark_context import BenchmarkContext
from workloads import *
from workloads import base
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -116,7 +116,7 @@ def get_available_workloads(customer_workloads: str = None) -> dict:
if key.startswith("_"):
continue
base_class = getattr(module, key)
if not inspect.isclass(base_class) or not issubclass(base_class, base.Workload):
if not inspect.isclass(base_class) or not issubclass(base_class, workloads.base.Workload):
continue
queries = collections.defaultdict(list)
for funcname in dir(base_class):
@ -137,7 +137,7 @@ def get_available_workloads(customer_workloads: str = None) -> dict:
if key.startswith("_"):
continue
base_class = getattr(dataset_to_use, key)
if not inspect.isclass(base_class) or not issubclass(base_class, base.Workload):
if not inspect.isclass(base_class) or not issubclass(base_class, workloads.base.Workload):
continue
queries = collections.defaultdict(list)
for funcname in dir(base_class):

View File

@ -10,6 +10,23 @@
# licenses/APL.txt.
import logging
from typing import Dict
from constants import (
COUNT,
CPU,
DATABASE,
DOCKER,
DURATION,
IMPORT,
ITERATIONS,
LATENCY_STATS,
MEMORY,
METADATA,
RETRIES,
RUN_CONFIGURATION,
THROUGHPUT,
)
COLOR_GRAY = 0
COLOR_RED = 1

View File

@ -455,7 +455,6 @@ class Memgraph(BaseRunner):
raise Exception("The database process died prematurely!")
_wait_for_server_socket(self._bolt_port)
ret = self._proc_mg.poll()
assert ret is None, "The database process died prematurely " "({})!".format(ret)
def _cleanup(self):
if self._proc_mg is None:
@ -479,7 +478,6 @@ class Memgraph(BaseRunner):
self._stop_event.set()
self.dump_rss(workload)
ret, usage = self._cleanup()
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
def start_db(self, workload):
@ -495,7 +493,6 @@ class Memgraph(BaseRunner):
self._stop_event.set()
self.dump_rss(workload)
ret, usage = self._cleanup()
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
def clean_db(self):
@ -644,7 +641,6 @@ class Neo4j(BaseRunner):
self._rss.clear()
p.start()
# Start DB
self._start()
if self._performance_tracking:
@ -657,7 +653,6 @@ class Neo4j(BaseRunner):
self.dump_rss(workload)
ret, usage = self._cleanup()
self.dump_db(path=self._neo4j_dump.parent)
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
def start_db(self, workload):
@ -689,7 +684,6 @@ class Neo4j(BaseRunner):
self.get_memory_usage("stop_" + workload)
self.dump_rss(workload)
ret, usage = self._cleanup()
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
def dump_db(self, path):

View File

@ -9,8 +9,7 @@
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
from abc import ABC, abstractclassmethod
from pathlib import Path
from abc import ABC
import helpers
from benchmark_context import BenchmarkContext
@ -18,6 +17,10 @@ from benchmark_context import BenchmarkContext
# Base dataset class used as a template to create each individual dataset. All
# common logic is handled here.
# This dataset is used also for disk storage. In that case, we have two dataset files, one for nodes
# and the second one for edges. Managing index file is handled in the same way. If the workload is used
# for disk storage, when calling the get_file() method, the exception will be raised. The user has to use
# get_node_file() or get_edge_file() method to get the correct file path.
class Workload(ABC):
# Name of the workload/dataset.
NAME = ""
@ -28,9 +31,13 @@ class Workload(ABC):
# List of local files that should be used to import the dataset.
LOCAL_FILE = None
LOCAL_FILE_NODES = None
LOCAL_FILE_EDGES = None
# URLs of remote dataset files that should be used to import the dataset, compressed in gz format.
URL_FILE = None
URL_FILE_NODES = None
URL_FILE_EDGES = None
# Index files
LOCAL_INDEX_FILE = None
@ -49,9 +56,11 @@ class Workload(ABC):
generator_prerequisite = "dataset_generator" in cls.__dict__
generator_indexes_prerequisite = "indexes_generator" in cls.__dict__
custom_import_prerequisite = "custom_import" in cls.__dict__
basic_import_prerequisite = ("LOCAL_FILE" in cls.__dict__ or "URL_FILE" in cls.__dict__) and (
"LOCAL_INDEX_FILE" in cls.__dict__ or "URL_INDEX_FILE" in cls.__dict__
)
basic_import_prerequisite = (
"LOCAL_FILE" in cls.__dict__
or "URL_FILE" in cls.__dict__
or ("URL_FILE_NODES" in cls.__dict__ and "URL_FILE_EDGES" in cls.__dict__)
) and ("LOCAL_INDEX_FILE" in cls.__dict__ or "URL_INDEX_FILE" in cls.__dict__)
if not name_prerequisite:
raise ValueError(
@ -87,7 +96,7 @@ class Workload(ABC):
return super().__init_subclass__()
def __init__(self, variant: str = None, benchmark_context: BenchmarkContext = None):
def __init__(self, variant: str = None, benchmark_context: BenchmarkContext = None, disk_workload: bool = False):
"""
Accepts a `variant` variable that indicates which variant
of the dataset should be executed
@ -96,63 +105,110 @@ class Workload(ABC):
self._variant = variant
self._vendor = benchmark_context.vendor_name
self._file = None
self._node_file = None
self._edge_file = None
self._file_index = None
self.disk_workload: bool = disk_workload
if self.NAME == "":
raise ValueError("Give your workload a name, by setting self.NAME")
if variant is None:
variant = self.DEFAULT_VARIANT
if variant not in self.VARIANTS:
raise ValueError("Invalid test variant!")
if (self.LOCAL_FILE and variant not in self.LOCAL_FILE) and (self.URL_FILE and variant not in self.URL_FILE):
raise ValueError("The variant doesn't have a defined URL or LOCAL file path!")
if variant not in self.SIZES:
raise ValueError("The variant doesn't have a defined dataset " "size!")
self._validate_variant_argument()
self._validate_vendor_argument()
if (self.LOCAL_INDEX_FILE and self._vendor not in self.LOCAL_INDEX_FILE) and (
self.URL_INDEX_FILE and self._vendor not in self.URL_INDEX_FILE
):
raise ValueError("Vendor does not have INDEX for dataset!")
self._set_local_files()
self._set_url_files()
if self.LOCAL_FILE is not None:
self._local_file = self.LOCAL_FILE.get(variant, None)
else:
self._local_file = None
if self.URL_FILE is not None:
self._url_file = self.URL_FILE.get(variant, None)
else:
self._url_file = None
if self.LOCAL_INDEX_FILE is not None:
self._local_index = self.LOCAL_INDEX_FILE.get(self._vendor, None)
else:
self._local_index = None
if self.URL_INDEX_FILE is not None:
self._url_index = self.URL_INDEX_FILE.get(self._vendor, None)
else:
self._url_index = None
self._set_local_index_file()
self._set_url_index_file()
self._size = self.SIZES[variant]
if "vertices" in self._size or "edges" in self._size:
self._num_vertices = self._size["vertices"]
self._num_edges = self._size["edges"]
def _validate_variant_argument(self) -> None:
if self._variant is None:
variant = self.DEFAULT_VARIANT
if self._variant not in self.VARIANTS:
raise ValueError("Invalid test variant!")
if self._variant not in self.SIZES:
raise ValueError("The variant doesn't have a defined dataset " "size!")
if not self.disk_workload:
if (self.LOCAL_FILE and self._variant not in self.LOCAL_FILE) and (
self.URL_FILE and self._variant not in self.URL_FILE
):
raise ValueError("The variant doesn't have a defined URL or LOCAL file path!")
else:
if (self.LOCAL_FILE_NODES and self._variant not in self.LOCAL_FILE_NODES) and (
self.URL_FILE_NODES and self._variant not in self.URL_FILE_NODES
):
raise ValueError("The variant doesn't have a defined URL or LOCAL file path for nodes!")
if (self.LOCAL_FILE_EDGES and self._variant not in self.LOCAL_FILE_EDGES) and (
self.URL_FILE_EDGES and self._variant not in self.URL_FILE_EDGES
):
raise ValueError("The variant doesn't have a defined URL or LOCAL file path for edges!")
def _validate_vendor_argument(self) -> None:
if (self.LOCAL_INDEX_FILE and self._vendor not in self.LOCAL_INDEX_FILE) and (
self.URL_INDEX_FILE and self._vendor not in self.URL_INDEX_FILE
):
raise ValueError("Vendor does not have INDEX for dataset!")
def _set_local_files(self) -> None:
if not self.disk_workload:
if self.LOCAL_FILE is not None:
self._local_file = self.LOCAL_FILE.get(self._variant, None)
else:
self._local_file = None
else:
if self.LOCAL_FILE_NODES is not None:
self._local_file_nodes = self.LOCAL_FILE_NODES.get(self._variant, None)
else:
self._local_file_nodes = None
if self.LOCAL_FILE_EDGES is not None:
self._local_file_edges = self.LOCAL_FILE_EDGES.get(self._variant, None)
else:
self._local_file_edges = None
def _set_url_files(self) -> None:
if not self.disk_workload:
if self.URL_FILE is not None:
self._url_file = self.URL_FILE.get(self._variant, None)
else:
self._url_file = None
else:
if self.URL_FILE_NODES is not None:
self._url_file_nodes = self.URL_FILE_NODES.get(self._variant, None)
else:
self._url_file_nodes = None
if self.URL_FILE_EDGES is not None:
self._url_file_edges = self.URL_FILE_EDGES.get(self._variant, None)
else:
self._url_file_edges = None
def _set_local_index_file(self) -> None:
if self.LOCAL_INDEX_FILE is not None:
self._local_index = self.LOCAL_INDEX_FILE.get(self._vendor, None)
else:
self._local_index = None
def _set_url_index_file(self) -> None:
if self.URL_INDEX_FILE is not None:
self._url_index = self.URL_INDEX_FILE.get(self._vendor, None)
else:
self._url_index = None
def prepare(self, directory):
if self._local_file is not None:
print("Using local dataset file:", self._local_file)
self._file = self._local_file
elif self._url_file is not None:
cached_input, exists = directory.get_file("dataset.cypher")
if not exists:
print("Downloading dataset file:", self._url_file)
downloaded_file = helpers.download_file(self._url_file, directory.get_path())
print("Unpacking and caching file:", downloaded_file)
helpers.unpack_gz_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file:", cached_input)
self._file = cached_input
if not self.disk_workload:
self._prepare_dataset_for_in_memory_workload(directory)
else:
self._prepare_dataset_for_on_disk_workload(directory)
if self._local_index is not None:
print("Using local index file:", self._local_index)
@ -167,6 +223,55 @@ class Workload(ABC):
print("Using cached index file:", cached_index)
self._file_index = cached_index
def _prepare_dataset_for_on_disk_workload(self, directory):
self._prepare_nodes_for_on_disk_workload(directory)
self._prepare_edges_for_on_disk_workload(directory)
def _prepare_nodes_for_on_disk_workload(self, directory):
if self._local_file_nodes is not None:
print("Using local dataset file for nodes:", self._local_file_nodes)
self._node_file = self._local_file_nodes
elif self._url_file_nodes is not None:
cached_input, exists = directory.get_file("dataset_nodes.cypher")
if not exists:
print("Downloading dataset file for nodes:", self._url_file_nodes)
downloaded_file = helpers.download_file(self._url_file_nodes, directory.get_path())
print("Unpacking and caching file for nodes:", downloaded_file)
helpers.unpack_gz_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file for nodes:", cached_input)
self._node_file = cached_input
def _prepare_edges_for_on_disk_workload(self, directory):
if self._local_file_edges is not None:
print("Using local dataset file for edges:", self._local_file_edges)
self._edge_file = self._local_file_edges
elif self._url_file_edges is not None:
cached_input, exists = directory.get_file("dataset_edges.cypher")
if not exists:
print("Downloading dataset file for edges:", self._url_file_edges)
downloaded_file = helpers.download_file(self._url_file_edges, directory.get_path())
print("Unpacking and caching file for edges:", downloaded_file)
helpers.unpack_gz_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file for edges:", cached_input)
self._edge_file = cached_input
def _prepare_dataset_for_in_memory_workload(self, directory):
if self._local_file is not None:
print("Using local dataset file:", self._local_file)
self._file = self._local_file
elif self._url_file is not None:
cached_input, exists = directory.get_file("dataset.cypher")
if not exists:
print("Downloading dataset file:", self._url_file)
downloaded_file = helpers.download_file(self._url_file, directory.get_path())
print("Unpacking and caching file:", downloaded_file)
helpers.unpack_gz_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file:", cached_input)
self._file = cached_input
def is_disk_workload(self):
return self.disk_workload
def get_variant(self):
"""Returns the current variant of the dataset."""
return self._variant
@ -179,8 +284,25 @@ class Workload(ABC):
"""
Returns path to the file that contains dataset creation queries.
"""
if self.disk_workload:
raise Exception("get_file method should not be called for disk storage")
return self._file
def get_node_file(self):
"""Returns path to the file that contains dataset creation queries for nodes."""
if not self.disk_workload:
raise Exception("get_node_file method should be called only for disk storage")
return self._node_file
def get_edge_file(self):
"""Returns path to the file that contains dataset creation queries for edges."""
if not self.disk_workload:
raise Exception("get_edge_file method should be called only for disk storage")
return self._edge_file
def get_size(self):
"""Returns number of vertices/edges for the current variant."""
return self._size

View File

@ -0,0 +1,449 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import random
from benchmark_context import BenchmarkContext
from workloads.base import Workload
from workloads.importers.disk_importer_pokec import ImporterPokec
class Pokec(Workload):
NAME = "pokec_disk"
VARIANTS = ["small", "medium", "large"]
DEFAULT_VARIANT = "small"
FILE = None
URL_FILE_NODES = {
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_small_import_nodes.cypher",
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_medium_import_nodes.cypher",
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_large_nodes.setup.cypher.gz",
}
URL_FILE_EDGES = {
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_small_import_edges.cypher",
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_medium_import_edges.cypher",
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_large_edges.setup.cypher.gz",
}
SIZES = {
"small": {"vertices": 10000, "edges": 121716},
"medium": {"vertices": 100000, "edges": 1768515},
"large": {"vertices": 1632803, "edges": 30622564},
}
URL_INDEX_FILE = {
"memgraph": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/memgraph.cypher",
"neo4j": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/neo4j.cypher",
}
PROPERTIES_ON_EDGES = False
def __init__(self, variant: str = None, benchmark_context: BenchmarkContext = None):
super().__init__(variant, benchmark_context=benchmark_context, disk_workload=True)
def custom_import(self) -> bool:
importer = ImporterPokec(
benchmark_context=self.benchmark_context,
dataset_name=self.NAME,
index_file=self._file_index,
dataset_nodes_file=self._node_file,
dataset_edges_file=self._edge_file,
variant=self._variant,
)
return importer.execute_import()
# Helpers used to generate the queries
def _get_random_vertex(self):
# All vertices in the Pokec dataset have an ID in the range
# [1, _num_vertices].
return random.randint(1, self._num_vertices)
def _get_random_from_to(self):
vertex_from = self._get_random_vertex()
vertex_to = vertex_from
while vertex_to == vertex_from:
vertex_to = self._get_random_vertex()
return (vertex_from, vertex_to)
# Arango benchmarks
# OK
def benchmark__arango__single_vertex_read(self):
return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()})
# OK
def benchmark__arango__single_vertex_write(self):
return (
"CREATE (n:UserTemp {id : $id}) RETURN n",
{"id": random.randint(1, self._num_vertices * 10)},
)
# OK
def benchmark__arango__single_edge_write(self):
vertex_from, vertex_to = self._get_random_from_to()
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "CREATE (n)-[e:Temp]->(m) RETURN e",
{"from": vertex_from, "to": vertex_to},
)
# OK
def benchmark__arango__aggregate(self):
return ("MATCH (n:User) RETURN n.age, COUNT(*)", {})
# OK
def benchmark__arango__aggregate_with_distinct(self):
return ("MATCH (n:User) RETURN COUNT(DISTINCT n.age)", {})
# OK
def benchmark__arango__aggregate_with_filter(self):
return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {})
# NOT OK
# def benchmark__arango__expansion_1(self):
# return (
# "MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__expansion_1_with_filter(self):
# return (
# "MATCH (s:User {id: $id})-->(n:User) " "WHERE n.age >= 18 " "RETURN n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__expansion_2(self):
# return (
# "MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__expansion_2_with_filter(self):
# return (
# "MATCH (s:User {id: $id})-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__expansion_3(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__expansion_3_with_filter(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__expansion_4(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__expansion_4_with_filter(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__neighbours_2(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__neighbours_2_with_filter(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__neighbours_2_with_data(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id, n",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__neighbours_2_with_data_and_filter(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id, n",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__arango__shortest_path(self):
# vertex_from, vertex_to = self._get_random_from_to()
# return (
# "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
# "MATCH p=(n)-[*bfs..15]->(m) "
# "RETURN extract(n in nodes(p) | n.id) AS path",
# {"from": vertex_from, "to": vertex_to},
# )
# NOT OK
# def benchmark__arango__shortest_path_with_filter(self):
# vertex_from, vertex_to = self._get_random_from_to()
# return (
# "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
# "MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) "
# "RETURN extract(n in nodes(p) | n.id) AS path",
# {"from": vertex_from, "to": vertex_to},
# )
# OK
def benchmark__arango__allshortest_paths(self):
vertex_from, vertex_to = self._get_random_from_to()
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*allshortest 2 (r, n | 1) total_weight]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to},
)
# Our benchmark queries
# OK
def benchmark__create__edge(self):
vertex_from, vertex_to = self._get_random_from_to()
return (
"MATCH (a:User {id: $from}), (b:User {id: $to}) " "CREATE (a)-[:TempEdge]->(b)",
{"from": vertex_from, "to": vertex_to},
)
# OK
def benchmark__create__pattern(self):
return ("CREATE ()-[:TempEdge]->()", {})
# OK
def benchmark__create__vertex(self):
return ("CREATE ()", {})
# OK
def benchmark__create__vertex_big(self):
return (
"CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, "
'p3: "Here is some text that is not extremely short", '
'p4:"Short text", p5: 234.434, p6: 11.11, p7: false})',
{},
)
# OK
def benchmark__aggregation__count(self):
return ("MATCH (n) RETURN count(n), count(n.age)", {})
# OK
def benchmark__aggregation__min_max_avg(self):
return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {})
# NOT OK
# def benchmark__match__pattern_cycle(self):
# return (
# "MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__match__pattern_long(self):
# return (
# "MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" "(n3)-[e3]->(n4)<-[e4]-(n5) " "RETURN n5 LIMIT 1",
# {"id": self._get_random_vertex()},
# )
# OK
def benchmark__match__pattern_short(self):
return (
"MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1",
{"id": self._get_random_vertex()},
)
# OK
def benchmark__match__vertex_on_label_property(self):
return (
"MATCH (n:User) WITH n WHERE n.id = $id RETURN n",
{"id": self._get_random_vertex()},
)
# OK
def benchmark__match__vertex_on_label_property_index(self):
return ("MATCH (n:User {id: $id}) RETURN n", {"id": self._get_random_vertex()})
# OK
def benchmark__match__vertex_on_property(self):
return ("MATCH (n:User {id: $id}) RETURN n", {"id": self._get_random_vertex()})
# OK
def benchmark__update__vertex_on_property(self):
return (
"MATCH (n {id: $id}) SET n.property = -1",
{"id": self._get_random_vertex()},
)
# Basic benchmark queries
# OK
def benchmark__basic__aggregate_aggregate(self):
return ("MATCH (n:User) RETURN n.age, COUNT(*)", {})
# OK
def benchmark__basic__aggregate_count_aggregate(self):
return ("MATCH (n) RETURN count(n), count(n.age)", {})
# OK
def benchmark__basic__aggregate_with_filter_aggregate(self):
return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {})
# OK
def benchmark__basic__min_max_avg_aggregate(self):
return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {})
# NOT OK
# def benchmark__basic__expansion_1_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__expansion_1_with_filter_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->(n:User) " "WHERE n.age >= 18 " "RETURN n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__expansion_2_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__expansion_2_with_filter_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__expansion_3_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__expansion_3_with_filter_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__expansion_4_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__expansion_4_with_filter_analytical(self):
# return (
# "MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__neighbours_2_analytical(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__neighbours_2_with_filter_analytical(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__neighbours_2_with_data_analytical(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id, n",
# {"id": self._get_random_vertex()},
# )
# NOT OK
# def benchmark__basic__neighbours_2_with_data_and_filter_analytical(self):
# return (
# "MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id, n",
# {"id": self._get_random_vertex()},
# )
# OK
def benchmark__basic__pattern_cycle_analytical(self):
return (
"MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2",
{"id": self._get_random_vertex()},
)
# OK
def benchmark__basic__pattern_long_analytical(self):
return (
"MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" "(n3)-[e3]->(n4)<-[e4]-(n5) " "RETURN n5 LIMIT 1",
{"id": self._get_random_vertex()},
)
# OK
def benchmark__basic__pattern_short_analytical(self):
return (
"MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1",
{"id": self._get_random_vertex()},
)
# OK
def benchmark__basic__single_vertex_property_update_update(self):
return (
"MATCH (n:User {id: $id}) SET n.property = -1",
{"id": self._get_random_vertex()},
)
# OK
def benchmark__basic__single_vertex_read_read(self):
return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()})
# OK
def benchmark__basic__single_vertex_write_write(self):
return (
"CREATE (n:UserTemp {id : $id}) RETURN n",
{"id": random.randint(1, self._num_vertices * 10)},
)
# OK
def benchmark__basic__single_edge_write_write(self):
vertex_from, vertex_to = self._get_random_from_to()
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "CREATE (n)-[e:Temp]->(m) RETURN e",
{"from": vertex_from, "to": vertex_to},
)

View File

@ -0,0 +1,62 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
from pathlib import Path
import log
from benchmark_context import BenchmarkContext
from constants import *
from runners import BaseRunner
class ImporterPokec:
def __init__(
self,
benchmark_context: BenchmarkContext,
dataset_name: str,
variant: str,
index_file: str,
dataset_nodes_file: str,
dataset_edges_file: str,
) -> None:
self._benchmark_context = benchmark_context
self._dataset_name = dataset_name
self._variant = variant
self._index_file = index_file
self._dataset_nodes_file = dataset_nodes_file
self._dataset_edges_file = dataset_edges_file
def execute_import(self):
if self._benchmark_context.vendor_name == "neo4j":
neo4j_dump = Path() / ".cache" / "datasets" / self._dataset_name / self._variant / "neo4j.dump"
vendor_runner = BaseRunner.create(
benchmark_context=self._benchmark_context,
)
vendor_runner.clean_db()
if neo4j_dump.exists():
log.log("Loading database from existing dump...")
vendor_runner.load_db_from_dump(path=neo4j_dump.parent)
else:
client = vendor_runner.fetch_client()
vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT)
log.log("Executing database index setup")
client.execute(file_path=self._index_file, num_workers=1)
log.log("Started importing dataset")
client.execute(
file_path=self._dataset_nodes_file, num_workers=self._benchmark_context.num_workers_for_import
)
client.execute(
file_path=self._dataset_edges_file, num_workers=self._benchmark_context.num_workers_for_import
)
vendor_runner.stop_db_init(VENDOR_RUNNER_IMPORT)
return True
else:
return False

View File

@ -20,9 +20,7 @@ GITHUB_REPOSITORY = os.getenv("GITHUB_REPOSITORY", "")
GITHUB_SHA = os.getenv("GITHUB_SHA", "")
GITHUB_REF = os.getenv("GITHUB_REF", "")
BENCH_GRAPH_SERVER_ENDPOINT = os.getenv(
"BENCH_GRAPH_SERVER_ENDPOINT",
"http://bench-graph-api:9001")
BENCH_GRAPH_SERVER_ENDPOINT = os.getenv("BENCH_GRAPH_SERVER_ENDPOINT", "http://bench-graph-api:9001")
log = logging.getLogger(__name__)
@ -30,7 +28,9 @@ log = logging.getLogger(__name__)
def parse_args():
argp = ArgumentParser(description=__doc__)
argp.add_argument("--benchmark-name", type=str, required=True)
argp.add_argument("--benchmark-results-path", type=str, required=True)
argp.add_argument("--benchmark-results", type=str, required=True)
argp.add_argument("--benchmark-results-in-memory-analytical-path", type=str, required=False)
argp.add_argument("--benchmark-results-on-disk-txn-path", type=str, required=False)
argp.add_argument("--github-run-id", type=int, required=True)
argp.add_argument("--github-run-number", type=int, required=True)
argp.add_argument("--head-branch-name", type=str, required=True)
@ -38,26 +38,45 @@ def parse_args():
def post_measurement(args):
with open(args.benchmark_results_path, "r") as f:
data = json.load(f)
timestamp = datetime.now().timestamp()
req = requests.post(
f"{BENCH_GRAPH_SERVER_ENDPOINT}/measurements",
json={
"name": args.benchmark_name,
"timestamp": timestamp,
"git_repo": GITHUB_REPOSITORY,
"git_ref": GITHUB_REF,
"git_sha": GITHUB_SHA,
"github_run_id": args.github_run_id,
"github_run_number": args.github_run_number,
"results": data,
"git_branch": args.head_branch_name},
timeout=1)
assert req.status_code == 200, \
f"Uploading {args.benchmark_name} data failed."
log.info(f"{args.benchmark_name} data sent to "
f"{BENCH_GRAPH_SERVER_ENDPOINT}")
timestamp = datetime.now().timestamp()
with open(args.benchmark_results, "r") as in_memory_txn_file:
in_memory_txn_data = json.load(in_memory_txn_file)
in_memory_analytical_data = None
if args.benchmark_results_in_memory_analytical_path is not None:
try:
with open(args.benchmark_results_in_memory_analytical_path, "r") as in_memory_analytical_file:
in_memory_analytical_data = json.load(in_memory_analytical_file)
except IOError:
log.error(f"Failed to load {args.benchmark_results_in_memory_analytical_path}.")
on_disk_txn_data = None
if args.benchmark_results_on_disk_txn_path is not None:
try:
with open(args.benchmark_results_on_disk_txn_path, "r") as on_disk_txn_file:
on_disk_txn_data = json.load(on_disk_txn_file)
except IOError:
log.error(f"Failed to load {args.benchmark_results_on_disk_txn_path}.")
req = requests.post(
f"{BENCH_GRAPH_SERVER_ENDPOINT}/measurements",
json={
"name": args.benchmark_name,
"timestamp": timestamp,
"git_repo": GITHUB_REPOSITORY,
"git_ref": GITHUB_REF,
"git_sha": GITHUB_SHA,
"github_run_id": args.github_run_id,
"github_run_number": args.github_run_number,
"results": in_memory_txn_data,
"in_memory_analytical_results": in_memory_analytical_data,
"on_disk_txn_results": on_disk_txn_data,
"git_branch": args.head_branch_name,
},
timeout=1,
)
assert req.status_code == 200, f"Uploading {args.benchmark_name} data failed."
log.info(f"{args.benchmark_name} data sent to " f"{BENCH_GRAPH_SERVER_ENDPOINT}")
if __name__ == "__main__":