Merge pull request #593 from memgraph/T1079-MG-add-simple-query-to-benchmark_v2

Add new dataset for mgbench
This commit is contained in:
János Benjamin Antal 2022-11-09 16:21:56 +01:00 committed by GitHub
commit 5c0e41ed44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 593 additions and 222 deletions

View File

@ -198,6 +198,15 @@ jobs:
cd build
ctest -R memgraph__simulation --output-on-failure -j$THREADS
- name: Run single benchmark test
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run simulation tests.
cd tests/mgbench
./benchmark.py accesscontrol/small --num-workers-for-import 1 --test-system-arg "split-file splitfiles/accesscontrol_small.shard_configuration bolt-num-workers 1"
release_build:
name: "Release build"
runs-on: [self-hosted, Linux, X64, Diff]
@ -245,6 +254,15 @@ jobs:
cd build
ctest -R memgraph__simulation --output-on-failure -j$THREADS
- name: Run single benchmark test
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run simulation tests.
cd tests/mgbench
./benchmark.py accesscontrol/small --num-workers-for-import 1 --test-system-arg "split-file splitfiles/accesscontrol_small.shard_configuration bolt-num-workers 1"
- name: Run e2e tests
run: |
# TODO(gitbuda): Setup mgclient and pymgclient properly.

View File

@ -15,15 +15,18 @@ import argparse
import collections
import copy
import fnmatch
import importlib
import inspect
import json
import multiprocessing
import os
import random
import sys
import time
import datasets
import log
import helpers
import log
import runners
@ -37,8 +40,7 @@ def get_queries(gen, count):
return ret
def match_patterns(dataset, variant, group, test, is_default_variant,
patterns):
def match_patterns(dataset, variant, group, test, is_default_variant, patterns):
for pattern in patterns:
verdict = [fnmatch.fnmatchcase(dataset, pattern[0])]
if pattern[1] != "":
@ -58,7 +60,7 @@ def filter_benchmarks(generators, patterns):
pattern = patterns[i].split("/")
if len(pattern) > 4 or len(pattern) == 0:
raise Exception("Invalid benchmark description '" + pattern + "'!")
pattern.extend(["", "*", "*"][len(pattern) - 1:])
pattern.extend(["", "*", "*"][len(pattern) - 1 :])
patterns[i] = pattern
filtered = []
for dataset in sorted(generators.keys()):
@ -68,8 +70,7 @@ def filter_benchmarks(generators, patterns):
current = collections.defaultdict(list)
for group in tests:
for test_name, test_func in tests[group]:
if match_patterns(dataset, variant, group, test_name,
is_default_variant, patterns):
if match_patterns(dataset, variant, group, test_name, is_default_variant, patterns):
current[group].append((test_name, test_func))
if len(current) > 0:
filtered.append((generator(variant), dict(current)))
@ -78,54 +79,64 @@ def filter_benchmarks(generators, patterns):
# Parse options.
parser = argparse.ArgumentParser(
description="Memgraph benchmark executor.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("benchmarks", nargs="*", default="",
help="descriptions of benchmarks that should be run; "
"multiple descriptions can be specified to run multiple "
"benchmarks; the description is specified as "
"dataset/variant/group/test; Unix shell-style wildcards "
"can be used in the descriptions; variant, group and test "
"are optional and they can be left out; the default "
"variant is '' which selects the default dataset variant; "
"the default group is '*' which selects all groups; the "
"default test is '*' which selects all tests")
parser.add_argument("--memgraph-binary",
default=helpers.get_binary_path("memgraph"),
help="Memgraph binary used for benchmarking")
parser.add_argument("--client-binary",
default=helpers.get_binary_path("tests/mgbench/client"),
help="client binary used for benchmarking")
parser.add_argument("--num-workers-for-import", type=int,
default=multiprocessing.cpu_count() // 2,
help="number of workers used to import the dataset")
parser.add_argument("--num-workers-for-benchmark", type=int,
default=1,
help="number of workers used to execute the benchmark")
parser.add_argument("--single-threaded-runtime-sec", type=int,
default=10,
help="single threaded duration of each test")
parser.add_argument("--no-load-query-counts", action="store_true",
help="disable loading of cached query counts")
parser.add_argument("--no-save-query-counts", action="store_true",
help="disable storing of cached query counts")
parser.add_argument("--export-results", default="",
help="file path into which results should be exported")
parser.add_argument("--temporary-directory", default="/tmp",
help="directory path where temporary data should "
"be stored")
parser.add_argument("--no-properties-on-edges", action="store_true",
help="disable properties on edges")
description="Memgraph benchmark executor.", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"benchmarks",
nargs="*",
default="",
help="descriptions of benchmarks that should be run; "
"multiple descriptions can be specified to run multiple "
"benchmarks; the description is specified as "
"dataset/variant/group/test; Unix shell-style wildcards "
"can be used in the descriptions; variant, group and test "
"are optional and they can be left out; the default "
"variant is '' which selects the default dataset variant; "
"the default group is '*' which selects all groups; the "
"default test is '*' which selects all tests",
)
parser.add_argument(
"--memgraph-binary", default=helpers.get_binary_path("memgraph"), help="Memgraph binary used for benchmarking"
)
parser.add_argument(
"--client-binary",
default=helpers.get_binary_path("tests/mgbench/client"),
help="client binary used for benchmarking",
)
parser.add_argument(
"--num-workers-for-import",
type=int,
default=multiprocessing.cpu_count() // 2,
help="number of workers used to import the dataset",
)
parser.add_argument(
"--num-workers-for-benchmark", type=int, default=1, help="number of workers used to execute the benchmark"
)
parser.add_argument("--single-threaded-runtime-sec", type=int, default=10, help="single threaded duration of each test")
parser.add_argument("--no-load-query-counts", action="store_true", help="disable loading of cached query counts")
parser.add_argument("--no-save-query-counts", action="store_true", help="disable storing of cached query counts")
parser.add_argument("--export-results", default="", help="file path into which results should be exported")
parser.add_argument(
"--temporary-directory", default="/tmp", help="directory path where temporary data should " "be stored"
)
parser.add_argument("--no-properties-on-edges", action="store_true", help="disable properties on edges")
parser.add_argument("--datasets-path", default="datasets", help="path to datasets to scan")
parser.add_argument("--test-system-args", default="")
args = parser.parse_args()
head_tail = os.path.split(args.datasets_path)
path_without_dataset_name = head_tail[0]
dataset_name = head_tail[1].split(".")[0]
sys.path.append(path_without_dataset_name)
dataset_to_use = importlib.import_module(dataset_name)
# Detect available datasets.
generators = {}
for key in dir(datasets):
for key in dir(dataset_to_use):
if key.startswith("_"):
continue
dataset = getattr(datasets, key)
if not inspect.isclass(dataset) or dataset == datasets.Dataset or \
not issubclass(dataset, datasets.Dataset):
dataset = getattr(dataset_to_use, key)
if not inspect.isclass(dataset) or dataset == datasets.Dataset or not issubclass(dataset, datasets.Dataset):
continue
tests = collections.defaultdict(list)
for funcname in dir(dataset):
@ -135,8 +146,9 @@ for key in dir(datasets):
tests[group].append((test, funcname))
generators[dataset.NAME] = (dataset, dict(tests))
if dataset.PROPERTIES_ON_EDGES and args.no_properties_on_edges:
raise Exception("The \"{}\" dataset requires properties on edges, "
"but you have disabled them!".format(dataset.NAME))
raise Exception(
'The "{}" dataset requires properties on edges, ' "but you have disabled them!".format(dataset.NAME)
)
# List datasets if there is no specified dataset.
if len(args.benchmarks) == 0:
@ -144,8 +156,7 @@ if len(args.benchmarks) == 0:
for name in sorted(generators.keys()):
print("Dataset:", name)
dataset, tests = generators[name]
print(" Variants:", ", ".join(dataset.VARIANTS),
"(default: " + dataset.DEFAULT_VARIANT + ")")
print(" Variants:", ", ".join(dataset.VARIANTS), "(default: " + dataset.DEFAULT_VARIANT + ")")
for group in sorted(tests.keys()):
print(" Group:", group)
for test_name, test_func in tests[group]:
@ -165,31 +176,44 @@ benchmarks = filter_benchmarks(generators, args.benchmarks)
# Run all specified benchmarks.
for dataset, tests in benchmarks:
log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(),
"dataset")
dataset.prepare(cache.cache_directory("datasets", dataset.NAME,
dataset.get_variant()))
log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(), "dataset")
dataset.prepare(cache.cache_directory("datasets", dataset.NAME, dataset.get_variant()))
# Prepare runners and import the dataset.
memgraph = runners.Memgraph(args.memgraph_binary, args.temporary_directory,
not args.no_properties_on_edges)
memgraph = runners.Memgraph(
args.memgraph_binary,
args.temporary_directory,
not args.no_properties_on_edges,
args.test_system_args,
)
client = runners.Client(args.client_binary, args.temporary_directory)
memgraph.start_preparation()
ret = client.execute(file_path=dataset.get_file(),
num_workers=args.num_workers_for_import)
time.sleep(5.0) # giving enough time to machine manager and all to start up
ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
usage = memgraph.stop()
# Display import statistics.
print()
for row in ret:
print("Executed", row["count"], "queries in", row["duration"],
"seconds using", row["num_workers"],
"workers with a total throughput of", row["throughput"],
"queries/second.")
print(
"Executed",
row["count"],
"queries in",
row["duration"],
"seconds using",
row["num_workers"],
"workers with a total throughput of",
row["throughput"],
"queries/second.",
)
print()
print("The database used", usage["cpu"],
"seconds of CPU time and peaked at",
usage["memory"] / 1024 / 1024, "MiB of RAM.")
print(
"The database used",
usage["cpu"],
"seconds of CPU time and peaked at",
usage["memory"] / 1024 / 1024,
"MiB of RAM.",
)
# Save import results.
import_key = [dataset.NAME, dataset.get_variant(), "__import__"]
@ -208,24 +232,26 @@ for dataset, tests in benchmarks:
config_key = [dataset.NAME, dataset.get_variant(), group, test]
cached_count = config.get_value(*config_key)
if cached_count is None:
print("Determining the number of queries necessary for",
args.single_threaded_runtime_sec,
"seconds of single-threaded runtime...")
print(
"Determining the number of queries necessary for",
args.single_threaded_runtime_sec,
"seconds of single-threaded runtime...",
)
# First run to prime the query caches.
memgraph.start_benchmark()
client.execute(queries=get_queries(func, 1), num_workers=1)
# Get a sense of the runtime.
count = 1
while True:
ret = client.execute(queries=get_queries(func, count),
num_workers=1)
ret = client.execute(queries=get_queries(func, count), num_workers=1)
duration = ret[0]["duration"]
should_execute = int(args.single_threaded_runtime_sec /
(duration / count))
print("executed_queries={}, total_duration={}, "
"query_duration={}, estimated_count={}".format(
count, duration, duration / count,
should_execute))
should_execute = int(args.single_threaded_runtime_sec / (duration / count))
print(
"executed_queries={}, total_duration={}, "
"query_duration={}, estimated_count={}".format(
count, duration, duration / count, should_execute
)
)
# We don't have to execute the next iteration when
# `should_execute` becomes the same order of magnitude as
# `count * 10`.
@ -235,45 +261,45 @@ for dataset, tests in benchmarks:
else:
count = count * 10
memgraph.stop()
config.set_value(*config_key, value={
"count": count,
"duration": args.single_threaded_runtime_sec})
config.set_value(*config_key, value={"count": count, "duration": args.single_threaded_runtime_sec})
else:
print("Using cached query count of", cached_count["count"],
"queries for", cached_count["duration"],
"seconds of single-threaded runtime.")
count = int(cached_count["count"] *
args.single_threaded_runtime_sec /
cached_count["duration"])
print(
"Using cached query count of",
cached_count["count"],
"queries for",
cached_count["duration"],
"seconds of single-threaded runtime.",
)
count = int(cached_count["count"] * args.single_threaded_runtime_sec / cached_count["duration"])
# Benchmark run.
print("Sample query:", get_queries(func, 1)[0][0])
print("Executing benchmark with", count, "queries that should "
"yield a single-threaded runtime of",
args.single_threaded_runtime_sec, "seconds.")
print("Queries are executed using", args.num_workers_for_benchmark,
"concurrent clients.")
print(
"Executing benchmark with",
count,
"queries that should yield a single-threaded runtime of",
args.single_threaded_runtime_sec,
"seconds.",
)
print("Queries are executed using", args.num_workers_for_benchmark, "concurrent clients.")
memgraph.start_benchmark()
ret = client.execute(queries=get_queries(func, count),
num_workers=args.num_workers_for_benchmark)[0]
ret = client.execute(queries=get_queries(func, count), num_workers=args.num_workers_for_benchmark)[0]
usage = memgraph.stop()
ret["database"] = usage
# Output summary.
print()
print("Executed", ret["count"], "queries in",
ret["duration"], "seconds.")
print("Executed", ret["count"], "queries in", ret["duration"], "seconds.")
print("Queries have been retried", ret["retries"], "times.")
print("Database used {:.3f} seconds of CPU time.".format(
usage["cpu"]))
print("Database peaked at {:.3f} MiB of memory.".format(
usage["memory"] / 1024.0 / 1024.0))
print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min",
"avg", "max"))
print("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
print("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max"))
metadata = ret["metadata"]
for key in sorted(metadata.keys()):
print("{name:>30}: {minimum:>20.06f} {average:>20.06f} "
"{maximum:>20.06f}".format(name=key, **metadata[key]))
print(
"{name:>30}: {minimum:>20.06f} {average:>20.06f} "
"{maximum:>20.06f}".format(name=key, **metadata[key])
)
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
# Save results.

View File

@ -0,0 +1,118 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import argparse
import random
import helpers
# Explaination of datasets:
# - empty_only_index: contains index; contains no data
# - small: contains index; contains data (small dataset)
#
# Datamodel is as follow:
#
# ┌──────────────┐
# │ Permission │
# ┌────────────────┐ │ Schema:uuid │ ┌────────────┐
# │:IS_FOR_IDENTITY├────┤ Index:name ├───┤:IS_FOR_FILE│
# └┬───────────────┘ └──────────────┘ └────────────┤
# │ │
# ┌──────▼──────────────┐ ┌──▼────────────────┐
# │ Identity │ │ File │
# │ Schema:uuid │ │ Schema:uuid │
# │ Index:email │ │ Index:name │
# └─────────────────────┘ │ Index:platformId │
# └───────────────────┘
#
# - File: attributes: ["uuid", "name", "platformId"]
# - Permission: attributes: ["uuid", "name"]
# - Identity: attributes: ["uuid", "email"]
#
# Indexes:
# - File: [File(uuid), File(platformId), File(name)]
# - Permission: [Permission(uuid), Permission(name)]
# - Identity: [Identity(uuid), Identity(email)]
#
# Edges:
# - (:Permission)-[:IS_FOR_FILE]->(:File)
# - (:Permission)-[:IS_FOR_IDENTITYR]->(:Identity)
#
# AccessControl specific: uuid is the schema
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--number_of_identities", type=int, default=10)
parser.add_argument("--number_of_files", type=int, default=10)
parser.add_argument("--percentage_of_permissions", type=float, default=1.0)
parser.add_argument("--filename", default="dataset.cypher")
args = parser.parse_args()
number_of_identities = args.number_of_identities
number_of_files = args.number_of_files
percentage_of_permissions = args.percentage_of_permissions
filename = args.filename
assert number_of_identities >= 0
assert number_of_files >= 0
assert percentage_of_permissions > 0.0 and percentage_of_permissions <= 1.0
assert filename != ""
with open(filename, "w") as f:
f.write("MATCH (n) DETACH DELETE n;\n")
# Create the indexes
f.write("CREATE INDEX ON :File;\n")
f.write("CREATE INDEX ON :Permission;\n")
f.write("CREATE INDEX ON :Identity;\n")
f.write("CREATE INDEX ON :File(platformId);\n")
f.write("CREATE INDEX ON :File(name);\n")
f.write("CREATE INDEX ON :Permission(name);\n")
f.write("CREATE INDEX ON :Identity(email);\n")
# Create extra index: in distributed, this will be the schema
f.write("CREATE INDEX ON :File(uuid);\n")
f.write("CREATE INDEX ON :Permission(uuid);\n")
f.write("CREATE INDEX ON :Identity(uuid);\n")
uuid = 1
# Create the nodes File
for index in range(0, number_of_files):
f.write(f'CREATE (:File {{uuid: {uuid}, platformId: "platform_id", name: "name_file_{uuid}"}});\n')
uuid += 1
identities = []
# Create the nodes Identity
for index in range(0, number_of_identities):
f.write(f'CREATE (:Identity {{uuid: {uuid}, name: "mail_{uuid}@something.com"}});\n')
uuid += 1
for outer_index in range(0, number_of_files):
for inner_index in range(0, number_of_identities):
file_uuid = outer_index + 1
identity_uuid = number_of_files + inner_index + 1
if random.random() <= percentage_of_permissions:
f.write(f'CREATE (:Permission {{uuid: {uuid}, name: "name_permission_{uuid}"}});\n')
f.write(
f"MATCH (permission:Permission {{uuid: {uuid}}}), (file:File {{uuid: {file_uuid}}}) CREATE (permission)-[e: IS_FOR_FILE]->(file);\n"
)
f.write(
f"MATCH (permission:Permission {{uuid: {uuid}}}), (identity:Identity {{uuid: {identity_uuid}}}) CREATE (permission)-[e: IS_FOR_IDENTITY]->(identity);\n"
)
uuid += 1
if __name__ == "__main__":
main()

View File

@ -45,13 +45,10 @@ class Dataset:
variant = self.DEFAULT_VARIANT
if variant not in self.VARIANTS:
raise ValueError("Invalid test variant!")
if (self.FILES and variant not in self.FILES) and \
(self.URLS and variant not in self.URLS):
raise ValueError("The variant doesn't have a defined URL or "
"file path!")
if (self.FILES and variant not in self.FILES) and (self.URLS and variant not in self.URLS):
raise ValueError("The variant doesn't have a defined URL or " "file path!")
if variant not in self.SIZES:
raise ValueError("The variant doesn't have a defined dataset "
"size!")
raise ValueError("The variant doesn't have a defined dataset " "size!")
self._variant = variant
if self.FILES is not None:
self._file = self.FILES.get(variant, None)
@ -63,8 +60,7 @@ class Dataset:
self._url = None
self._size = self.SIZES[variant]
if "vertices" not in self._size or "edges" not in self._size:
raise ValueError("The size defined for this variant doesn't "
"have the number of vertices and/or edges!")
raise ValueError("The size defined for this variant doesn't " "have the number of vertices and/or edges!")
self._num_vertices = self._size["vertices"]
self._num_edges = self._size["edges"]
@ -76,8 +72,7 @@ class Dataset:
cached_input, exists = directory.get_file("dataset.cypher")
if not exists:
print("Downloading dataset file:", self._url)
downloaded_file = helpers.download_file(
self._url, directory.get_path())
downloaded_file = helpers.download_file(self._url, directory.get_path())
print("Unpacking and caching file:", downloaded_file)
helpers.unpack_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file:", cached_input)
@ -109,9 +104,9 @@ class Pokec(Dataset):
DEFAULT_VARIANT = "small"
FILES = None
URLS = {
"small": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_small.setup.cypher",
"medium": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_medium.setup.cypher",
"large": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_large.setup.cypher.gz",
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/pokec_small.setup.cypher",
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/pokec_medium.setup.cypher",
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/pokec_large.setup.cypher.gz",
}
SIZES = {
"small": {"vertices": 10000, "edges": 121716},
@ -137,18 +132,17 @@ class Pokec(Dataset):
# Arango benchmarks
def benchmark__arango__single_vertex_read(self):
return ("MATCH (n:User {id : $id}) RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()})
def benchmark__arango__single_vertex_write(self):
return ("CREATE (n:UserTemp {id : $id}) RETURN n",
{"id": random.randint(1, self._num_vertices * 10)})
return ("CREATE (n:UserTemp {id : $id}) RETURN n", {"id": random.randint(1, self._num_vertices * 10)})
def benchmark__arango__single_edge_write(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"CREATE (n)-[e:Temp]->(m) RETURN e",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "CREATE (n)-[e:Temp]->(m) RETURN e",
{"from": vertex_from, "to": vertex_to},
)
def benchmark__arango__aggregate(self):
return ("MATCH (n:User) RETURN n.age, COUNT(*)", {})
@ -157,92 +151,94 @@ class Pokec(Dataset):
return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {})
def benchmark__arango__expansion_1(self):
return ("MATCH (s:User {id: $id})-->(n:User) "
"RETURN n.id",
{"id": self._get_random_vertex()})
return ("MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id", {"id": self._get_random_vertex()})
def benchmark__arango__expansion_1_with_filter(self):
return ("MATCH (s:User {id: $id})-->(n:User) "
"WHERE n.age >= 18 "
"RETURN n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->(n:User) " "WHERE n.age >= 18 " "RETURN n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_2(self):
return ("MATCH (s:User {id: $id})-->()-->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return ("MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id", {"id": self._get_random_vertex()})
def benchmark__arango__expansion_2_with_filter(self):
return ("MATCH (s:User {id: $id})-->()-->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_3(self):
return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_3_with_filter(self):
return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_4(self):
return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_4_with_filter(self):
return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__neighbours_2(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id", {"id": self._get_random_vertex()})
def benchmark__arango__neighbours_2_with_filter(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__neighbours_2_with_data(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()},
)
def benchmark__arango__neighbours_2_with_data_and_filter(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()},
)
def benchmark__arango__shortest_path(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to},
)
def benchmark__arango__shortest_path_with_filter(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to},
)
# Our benchmark queries
def benchmark__create__edge(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (a:User {id: $from}), (b:User {id: $to}) "
"CREATE (a)-[:TempEdge]->(b)",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (a:User {id: $from}), (b:User {id: $to}) " "CREATE (a)-[:TempEdge]->(b)",
{"from": vertex_from, "to": vertex_to},
)
def benchmark__create__pattern(self):
return ("CREATE ()-[:TempEdge]->()", {})
@ -251,9 +247,12 @@ class Pokec(Dataset):
return ("CREATE ()", {})
def benchmark__create__vertex_big(self):
return ("CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, "
"p3: \"Here is some text that is not extremely short\", "
"p4:\"Short text\", p5: 234.434, p6: 11.11, p7: false})", {})
return (
"CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, "
'p3: "Here is some text that is not extremely short", '
'p4:"Short text", p5: 234.434, p6: 11.11, p7: false})',
{},
)
def benchmark__aggregation__count(self):
return ("MATCH (n) RETURN count(n), count(n.age)", {})
@ -262,29 +261,124 @@ class Pokec(Dataset):
return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {})
def benchmark__match__pattern_cycle(self):
return ("MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) "
"RETURN e1, m, e2",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2", {"id": self._get_random_vertex()})
def benchmark__match__pattern_long(self):
return ("MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->"
"(n3)-[e3]->(n4)<-[e4]-(n5) "
"RETURN n5 LIMIT 1",
{"id": self._get_random_vertex()})
return (
"MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" "(n3)-[e3]->(n4)<-[e4]-(n5) " "RETURN n5 LIMIT 1",
{"id": self._get_random_vertex()},
)
def benchmark__match__pattern_short(self):
return ("MATCH (n:User {id: $id})-[e]->(m) "
"RETURN m LIMIT 1",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1", {"id": self._get_random_vertex()})
def benchmark__match__vertex_on_label_property(self):
return ("MATCH (n:User) WITH n WHERE n.id = $id RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n:User) WITH n WHERE n.id = $id RETURN n", {"id": self._get_random_vertex()})
def benchmark__match__vertex_on_label_property_index(self):
return ("MATCH (n:User {id: $id}) RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id: $id}) RETURN n", {"id": self._get_random_vertex()})
def benchmark__match__vertex_on_property(self):
return ("MATCH (n {id: $id}) RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n {id: $id}) RETURN n", {"id": self._get_random_vertex()})
class AccessControl(Dataset):
# Explaination of datasets:
# - empty_only_index: contains index; contains no data
# - small/medium/large: contains index; contains data (respectively small/medium/large dataset)
#
# See dataset_creator.py to understand the datamodel and generate a dataset
NAME = "accesscontrol"
VARIANTS = ["empty_only_index", "small", "medium", "large"]
DEFAULT_VARIANT = "empty_only_index"
URLS = {
"empty_only_index": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_empty_only_index.setup.cypher.gz",
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_small.setup.cypher.gz",
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_medium.setup.cypher.gz",
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_large.setup.cypher.gz",
}
SIZES = {
"empty_only_index": {
"vertices": 0,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 0, "last_uuid": 0},
"Permission": {"first_uuid": 0, "last_uuid": 0},
"Identity": {"first_uuid": 0, "last_uuid": 0},
},
},
"small": {
"vertices": 30,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 1, "last_uuid": 10},
"Identity": {"first_uuid": 11, "last_uuid": 20},
"Permission": {"first_uuid": 21, "last_uuid": 120}, # 120=10*10+20
},
},
"medium": {
"vertices": 3000,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 1, "last_uuid": 1000},
"Identity": {"first_uuid": 1001, "last_uuid": 2000},
"Permission": {"first_uuid": 2001, "last_uuid": 1002000}, # 1002000=1000*1000+2000
},
},
"large": {
"vertices": 30000,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 1, "last_uuid": 10000},
"Identity": {"first_uuid": 10001, "last_uuid": 20000},
"Permission": {"first_uuid": 20001, "last_uuid": 100020000}, # 100020000=10000*10000+20000
},
},
}
def _get_random_uuid(self, type):
assert type in ["File", "Permission", "Identity"]
first_uuid = self.get_size()["uuid_ranges"][type]["first_uuid"]
last_uuid = self.get_size()["uuid_ranges"][type]["last_uuid"]
random_value = random.randint(first_uuid, last_uuid)
return random_value
def __init__(self, variant=None):
super().__init__(variant)
self.next_value_idx = self.get_size()["vertices"] + 1
def benchmark__create__vertex(self):
self.next_value_idx += 1
query = (f"CREATE (:File {{uuid: {self.next_value_idx}}});", {})
return query
def benchmark__create__edges(self):
permission_uuid = self._get_random_uuid("Permission")
file_uuid = self._get_random_uuid("File")
query = (
"MATCH (permission:Permission {uuid: $permission_uuid}), (file:File {uuid: $file_uuid}) "
"CREATE (permission)-[:IS_FOR_FILE]->(file)",
{"permission_uuid": permission_uuid, "file_uuid": file_uuid},
)
return query
def benchmark__match__match_all_vertices(self):
self.next_value_idx += 1
query = ("MATCH (n) RETURN *", {})
return query
def benchmark__match__match_on_labelled_vertices(self):
self.next_value_idx += 1
query = ("MATCH (n:File) RETURN *", {})
return query
def benchmark__match__match_all_vertices_with_edges(self):
self.next_value_idx += 1
query = ("MATCH (permission:Permission)-[e:IS_FOR_FILE]->(file:File) RETURN *", {})
return query

View File

@ -14,7 +14,6 @@ import json
import os
import subprocess
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -28,18 +27,25 @@ def get_binary_path(path, base=""):
def download_file(url, path):
ret = subprocess.run(["wget", "-nv", "--content-disposition", url],
stderr=subprocess.PIPE, cwd=path, check=True)
data = ret.stderr.decode("utf-8")
tmp = data.split("->")[1]
name = tmp[tmp.index('"') + 1:tmp.rindex('"')]
return os.path.join(path, name)
if "https://" in url:
ret = subprocess.run(
["wget", "-nv", "--content-disposition", url], stderr=subprocess.PIPE, cwd=path, check=True
)
data = ret.stderr.decode("utf-8")
tmp = data.split("->")[1]
name = tmp[tmp.index('"') + 1 : tmp.rindex('"')]
return os.path.join(path, name)
else:
assert os.path.exists(url)
subprocess.run(["cp", url, path], stderr=subprocess.PIPE, cwd=path, check=True)
tmp = url.split("/")
name = tmp[len(tmp) - 1]
return os.path.join(path, name)
def unpack_and_move_file(input_path, output_path):
if input_path.endswith(".gz"):
subprocess.run(["gunzip", input_path],
stdout=subprocess.DEVNULL, check=True)
subprocess.run(["gunzip", input_path], stdout=subprocess.DEVNULL, check=True)
input_path = input_path[:-3]
os.rename(input_path, output_path)

View File

@ -40,8 +40,7 @@ def _convert_args_to_flags(*args, **kwargs):
def _get_usage(pid):
total_cpu = 0
with open("/proc/{}/stat".format(pid)) as f:
total_cpu = (sum(map(int, f.read().split(")")[1].split()[11:15])) /
os.sysconf(os.sysconf_names["SC_CLK_TCK"]))
total_cpu = sum(map(int, f.read().split(")")[1].split()[11:15])) / os.sysconf(os.sysconf_names["SC_CLK_TCK"])
peak_rss = 0
with open("/proc/{}/status".format(pid)) as f:
for row in f:
@ -52,18 +51,17 @@ def _get_usage(pid):
class Memgraph:
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges):
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges, extra_args):
self._memgraph_binary = memgraph_binary
self._directory = tempfile.TemporaryDirectory(dir=temporary_dir)
self._properties_on_edges = properties_on_edges
self._proc_mg = None
self._extra_args = extra_args
atexit.register(self._cleanup)
# Determine Memgraph version
ret = subprocess.run([memgraph_binary, "--version"],
stdout=subprocess.PIPE, check=True)
version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+",
ret.stdout.decode("utf-8")).group(0)
ret = subprocess.run([memgraph_binary, "--version"], stdout=subprocess.PIPE, check=True)
version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+", ret.stdout.decode("utf-8")).group(0)
self._memgraph_version = tuple(map(int, version.split(".")))
def __del__(self):
@ -79,8 +77,14 @@ class Memgraph:
if self._memgraph_version >= (0, 50, 0):
kwargs["storage_properties_on_edges"] = self._properties_on_edges
else:
assert self._properties_on_edges, \
"Older versions of Memgraph can't disable properties on edges!"
assert self._properties_on_edges, "Older versions of Memgraph can't disable properties on edges!"
if self._extra_args != "":
args_list = self._extra_args.split(" ")
assert len(args_list) % 2 == 0
for i in range(0, len(args_list), 2):
kwargs[args_list[i]] = args_list[i + 1]
return _convert_args_to_flags(self._memgraph_binary, **kwargs)
def _start(self, **kwargs):
@ -94,8 +98,7 @@ class Memgraph:
raise Exception("The database process died prematurely!")
wait_for_server(7687)
ret = self._proc_mg.poll()
assert ret is None, "The database process died prematurely " \
"({})!".format(ret)
assert ret is None, "The database process died prematurely " "({})!".format(ret)
def _cleanup(self):
if self._proc_mg is None:
@ -121,8 +124,7 @@ class Memgraph:
def stop(self):
ret, usage = self._cleanup()
assert ret == 0, "The database process exited with a non-zero " \
"status ({})!".format(ret)
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
@ -135,8 +137,7 @@ class Client:
return _convert_args_to_flags(self._client_binary, **kwargs)
def execute(self, queries=None, file_path=None, num_workers=1):
if (queries is None and file_path is None) or \
(queries is not None and file_path is not None):
if (queries is None and file_path is None) or (queries is not None and file_path is not None):
raise ValueError("Either queries or input_path must be specified!")
# TODO: check `file_path.endswith(".json")` to support advanced
@ -151,8 +152,8 @@ class Client:
json.dump(query, f)
f.write("\n")
args = self._get_args(input=file_path, num_workers=num_workers,
queries_json=queries_json)
args = self._get_args(input=file_path, num_workers=num_workers, queries_json=queries_json)
ret = subprocess.run(args, stdout=subprocess.PIPE, check=True)
data = ret.stdout.decode("utf-8").strip().split("\n")
data = [x for x in data if not x.startswith("[")]
return list(map(json.loads, data))

View File

@ -0,0 +1,36 @@
4
uuid
email
name
platformId
2
IS_FOR_IDENTITY
IS_FOR_FILE
3
File
1
uuid
int
1
[1]
Identity
1
uuid
int
1
[10001]
Permission
1
uuid
int
10
[20001]
[10020000]
[20002000]
[30002000]
[40002000]
[50002000]
[60002000]
[70002000]
[80002000]
[90002000]

View File

@ -0,0 +1,36 @@
4
uuid
email
name
platformId
2
IS_FOR_IDENTITY
IS_FOR_FILE
3
File
1
uuid
int
1
[1]
Identity
1
uuid
int
1
[1001]
Permission
1
uuid
int
10
[2001]
[102000]
[202000]
[302000]
[402000]
[502000]
[602000]
[702000]
[802000]
[902000]

View File

@ -0,0 +1,36 @@
4
uuid
email
name
platformId
2
IS_FOR_IDENTITY
IS_FOR_FILE
3
File
1
uuid
int
1
[1]
Identity
1
uuid
int
1
[11]
Permission
1
uuid
int
10
[21]
[31]
[41]
[51]
[61]
[71]
[81]
[91]
[100]
[110]