Add mixed workload and Neo4j client to mgbench (#566)

* Fix bolt bug inside the C++ client
* Add tail latency stats
* Add hot run option
* Add query caching
* Add jcmd memory tracking
This commit is contained in:
Ante Javor 2022-11-28 08:47:22 +01:00 committed by GitHub
parent 1d5f387ddd
commit 11300960de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1685 additions and 284 deletions

View File

@ -90,7 +90,7 @@ QueryData Client::Execute(const std::string &query, const std::map<std::string,
// It is super critical from performance point of view to send the pull message right after the run message. Otherwise
// the performance will degrade multiple magnitudes.
encoder_.MessageRun(query, parameters, {});
encoder_.MessagePull({});
encoder_.MessagePull({{"n", Value(-1)}});
spdlog::debug("Reading run message response");
Signature signature{};

287
tests/mgbench/README.md Normal file
View File

@ -0,0 +1,287 @@
# mgBench: Benchmark for graph databases
## Benchmark Overview
mgBench is primarily designed to benchmark graph databases. To test graph database performance, this benchmark executes Cypher queries (write, read, update, aggregate, and analyze) on a given dataset. Queries are general and represent a typical workload that would be used to analyze any graph dataset. [BenchGraph](https://memgraph.github.io/benchgraph) platform shows the results of running these queries on supported vendors. It shows the overall performance of each system relative to others.
Three workload types can be executed:
- Isolated - Concurrent execution of a single query,
- Mixed - Concurrent execution of a single query mixed with a certain percentage of queries from a designated query group,
- Realistic - Concurrent execution of queries from write, read, update and analyze groups.
Currently, the benchmark is executed on the social media dataset Pokec, available in different sizes. The full list of queries and their grouping is available as [query list](#query-list).
This methodology is designed to be read from top to bottom to understand what is being tested and how, but feel free to jump to parts that interest you.
- [mgBench: Benchmark for graph databases](#mgbench-benchmark-for-graph-databases)
- [Benchmark Overview](#benchmark-overview)
- [Design goals](#design-goals)
- [Reproducibility and validation](#reproducibility-and-validation)
- [Database compatibility](#database-compatibility)
- [Workloads](#workloads)
- [Fine-tuning](#fine-tuning)
- [Limitations](#limitations)
- [mgBench](#mgbench)
- [Important files](#important-files)
- [Prerequisites](#prerequisites)
- [Running the benchmark](#running-the-benchmark)
- [Database conditions](#database-conditions)
- [Comparing results](#comparing-results)
- [Results](#results)
- [Datasets](#datasets)
- [Pokec](#pokec)
- [Query list](#query-list)
- [Platform](#platform)
- [Intel - HP](#intel---hp)
- [Supported databases](#supported-databases)
- [Database notes](#database-notes)
- [History and Future of mgBench](#history-and-future-of-mgbench)
- [History of mgBench](#history-of-mgbench)
- [Future of mgBench](#future-of-mgbench)
## Design goals
### Reproducibility and validation
Running this benchmark is automated, and the code used to run benchmarks is publicly available. You can [run mgBench](#running-the-benchmark) with default settings to validate the results at [BenchGraph platform](https://memgraph.github.io/benchgraph). The results may differ depending on the hardware, database configuration, and other variables involved in your setup. But if the results you get are significantly different, feel free to [open a GitHub issue](https://github.com/memgraph/memgraph/issues).
In the future, the project will be expanded to include more platforms to see how systems perform on different OS and hardware configurations. If you are interested in what will be added and tested, read the section about [the future of mgBench](#future-of-mgbench)
### Database compatibility
At the moment, support for graph databases is limited. To run the benchmarks, the graph database must support Cypher query language and the Bolt protocol.
Using Cypher ensures that executed queries are identical or similar on every supported system. Possible differences are noted in [database notes](#database-notes). A single C++ client queries all database systems, and it is based on the Bolt protocol. Using a single client ensures minimal performance penalties from the client side and ensures fairness across different vendors.
If your database supports the given requirements, feel free to contribute and add your database to mgBench.
If your database does not support the mentioned requirements, follow the project because support for other languages and protocols in graph database space will be added.
### Workloads
Running queries as standalone units is simple and relatively easy to measure, but vendors often apply various caching and pre-aggregations that influence the results in these kinds of scenarios. Results from running single queries can hint at the database's general performance, but in real life, a database is queried by multiple clients from multiple sides. That is why the mgBench client supports the consecutive execution of various queries. Concurrently writing, reading, updating and executing aggregational and analytical queries provides a better view of overall system performance than executing and measuring a single query. Queries that the mgBench executes are grouped into 5 groups - write, read, update, aggregate and analytical.
The [BenchGraph platform](https://memgraph.github.io/benchgraph) shows results made by mgBench by executing three types of workloads:
- Isolated workload
- Mixed workload
- Realistic workload
Each of these workloads has a specific purpose:
***Isolated*** workload is the simplest test. An isolated workload goes through all the queries individually, concurrently executing a single query a predefined number of times. It is similar to executing a single query and measuring time but more complex due to concurrency. How many times a specific query will be executed depends on the approximation of the querys latency. If a query is slower, it will be executed fewer times, if a query is faster, it will be executed more times. The approximation is based on the duration of execution for several concurrent threads, and it varies between vendors.
If a query takes arguments, the argument value is changed for each execution. Arguments are generated non-randomly, so each vendor gets the same sequence of queries with the same arguments. This enables a deterministic workload for both vendors.
The good thing about isolated workload is that it yields a better picture of single query performance. There is also a negative side, executing the same queries multiple times can trigger strong results caching on the vendor's side, which can result in false query times.
***Mixed*** workload executes a fixed number of queries that read, update, aggregate, or analyze the data concurrently with a certain percentage of write queries because writing from the database can prevent aggressive caching and thus represent a more realistic performance of a single query. The negative side is that there is an added influence of write performance on the results. Currently, mgBench client does not support per-thread performance measurements, but this will be added in future iterations.
***Realistic*** workload represents real-life use cases because queries write, read, update, and perform analytics in a mixed ratio like they would in real projects. The test executes a fixed number of queries, the distribution of which is defined by defining a percentage of queries performing one of four operations. The queries are selected non-randomly, so the workload is identical between different vendors. As with the rest of the workloads, all queries are executed concurrently.
### Fine-tuning
Each database system comes with a wide variety of possible configurations. Changing each of those configuration settings can introduce performance improvements or penalties. The focus of this benchmark is "out-of-the-box" performance without fine-tuning with the goal of having the fairest possible comparison. Fine-tuning can make some systems perform magnitudes faster, but this makes general benchmark systems hard to manage because all systems are configured differently, and fine-tuning requires vendor DB experts.
Some configurational changes are necessary for test execution and are not considered fine-tuning. For example, configuring the database to avoid Bolt client login is valid since the tests are not performed under any type of authorization. All non-default configurations are mentioned in [database notes](#database-notes)
### Limitations
Benchmarking different systems is challenging because the setup, environment, queries, workload, and dataset can benefit specific database vendors. Each vendor may have a particularly strong use-case scenario. This benchmark aims to be neutral and fair to all database vendors. Acknowledging some of the current limitations can help understand the issues you might notice:
1. mgBench measures and tracks just a tiny subset of everything that can be tracked and compared during testing. Active benchmarking is strenuous because it requires a lot of time to set up and validate. Passive benchmarking is much faster to iterate on but can have a few bugs.
2. Datasets and queries used for testing are simple. Datasets and queries in real-world environments can become quite complex. To avoid Cypher specifics, mgBench uses simple queries of different variates. Future versions will include more complex datasets and queries.
3. The scale of the dataset used is miniature for production environments. Production environments can have up to trillions of nodes and edges.
Query results are not verified or important. The queries might return different results, but only the performance is measured, not correctness.
4. All tests are performed on single-node databases.
5. Architecturally different systems can be set up and measured biasedly.
## mgBench
### Important files
Listed below are the main scripts used to run the benchmarks:
- `benchmark.py` - Script that runs the queries and workloads.
- `datasets.py` - Script that handles datasets and queries for workloads.
- `runners.py` - Script holding the configuration for different DB vendors.
- `client.cpp` - Client for querying the database.
- `graph_bench.py` - Script that starts all predefined and custom-defined workloads.
-` compare_results.py` - Script that visually compares benchmark results.
Except for these scripts, the project also includes dataset files and index configuration files. Once the first test is executed, those files can be located in the newly generated .cache folder.
### Prerequisites
To execute a benchmark, you need to download a binary version of supported databases and install Python on your system. Each database vendor can depend on external dependencies, such as Cmake, JVM, etc., so make sure to check specific vendor prerequisites.
### Running the benchmark
To run benchmarks, use the `graph_bench.py` script, which calls all the other necessary scripts. You can start the benchmarks by executing the following command:
```
graph_bench.py
--vendor memgraph /home/memgraph/binary
--dataset-group basic
--dataset-size small
--realistic 100 30 70 0 0
--realistic 100 50 50 0 0
--realistic 100 70 30 0 0
--realistic 100 30 40 10 20
--mixed 100 30 0 0 0 70
```
Isolated workload are always executed, and this commands calls for the execution of four realistic workloads with different distribution of queries and one mixed workload on a small size dataset.
The distribution of queries from write, read, update and aggregate groups are defined in percentages and stated as arguments following the `--realistic` or `--mixed` flags.
In the example of `--realistic 100 30 40 10 20` the distribution is as follows:
- 100 - The number of queries to be executed.
- 30 - The percentage of write queries to be executed.
- 40 - The percentage of read queries to be executed.
- 10 - The percentage of update queries to be executed.
- 20 - The percentage of analytical queries to be executed.
For `--mixed` workload argument, the first five parameters are the same, with an addition of a parameter for defining the percentage of individual queries.
Feel free to add different configurations if you want. Results from the above benchmark run are visible on [BenchGraph platform](https://memgraph.github.io/benchgraph)
### Database conditions
In a production environment, database query caches are usually warmed from usage or pre-warm procedure to provide the best possible performance. Each workload in mgBench will be executed under the following conditions:
- ***Hot run*** - before executing any benchmark query and taking measurements, a set of defined queries is executed to pre-warm the database.
- ***Cold run*** - no warm-up was performed on the database before taking benchmark measurements.
List of queries used for pre-warm up:
```
CREATE ();
CREATE ()-[:TempEdge]->();
MATCH (n) RETURN n LIMIT 1;
```
### Comparing results
Once the benchmark has been run for a single vendor, all the results are saved in appropriately named `.json` files. A summary file is also created for that vendor and it contains all results combined. These summary files are used to compare results against other vendor results via the `compare_results.py` script:
```
compare_results.py
--compare
“path_to/neo4j_summary.json”
“path_to/memgraph_summary.json”
--output neo4j_vs_memgraph.html
--different-vendors
```
The output is an HTML file with the visual representation of the performance differences between two compared vendors. The first passed summary JSON file is the reference point.
## Results
Results visible in the HTML file or at [BenchGraph](https://memgraph.github.io/benchgraph) are throughput, memory, and latency. Database throughput and memory usage directly impact database usability and cost, while the latency of the query shows the base query execution duration.
***Throughput*** directly defines how performant the database is and how much query traffic it can handle in a fixed time interval. It is expressed in queries per second. In each concurrent workload, execution is split across multiple clients. Each client executes queries concurrently. The duration of total execution is the sum of all concurrent clients' execution duration in seconds. In mgBench, the total count of executed queries and the total duration defines throughput per second across concurrent execution.
Here is the code snippet from the client, that calculates ***throughput*** and metadata:
```
// Create and output summary.
Metadata final_metadata;
uint64_t final_retries = 0;
double final_duration = 0.0;
for (int i = 0; i < FLAGS_num_workers; ++i) {
final_metadata += worker_metadata[i];
final_retries += worker_retries[i];
final_duration += worker_duration[i];
}
final_duration /= FLAGS_num_workers;
nlohmann::json summary = nlohmann::json::object();
summary["count"] = queries.size();
summary["duration"] = final_duration;
summary["throughput"] = static_cast<double>(queries.size()) / final_duration;
summary["retries"] = final_retries;
summary["metadata"] = final_metadata.Export();
summary["num_workers"] = FLAGS_num_workers;
(*stream) << summary.dump() << std::endl;
```
***Memory*** usage is calculated as ***peak RES*** (resident size) memory for each query or workload execution within mgBench. The result includes starting the database, executing the query/workload, and stopping the database. The peak RES is extracted from process PID as VmHVM (peak resident set size) before the process is stopped. The peak memory usage defines the worst-case scenario for a given query or workload, while on average, RAM footprint is lower. Measuring RES over time is supported by `runners.py`. For each vendor, it is possible to add RES tracking across workload execution, but it is not reported in the results.
***Latency*** is calculated as the serial execution of 100 identical queries on a single thread. Each query has standard query statistics and tail latency data. The result includes query execution times: max, min, mean, p99, p95, p90, p75, and p50 in seconds.
Each workload and all the results are based on concurrent query execution, except ***latency***. As stated in [limitations](#limitations) section, mgBench tracks just a subset of resources, but the chapter on [mgBench future](#future-of-mgbench) explains the expansion plans.
## Datasets
Before workload execution, appropriate dataset indexes are set. Each vendor can have a specific syntax for setting up indexes, but those indexes should be schematically as similar as possible.
After each workload is executed, the database is cleaned, and a new dataset is imported to provide a clean start for the following workload run. When executing isolated and mixed workloads, the database is also restarted after executing each query to minimize the impact on the following query execution.
### Pokec
Currently, the only available dataset to run the benchmarks on is the Slovenian social network, Pokec. Its available in three different sizes, small, medium, and large.
- [small](https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_small_import.cypher) - vertices 10,000, edges 121,716
- [medium](https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_medium_import.cypher) - vertices 100,000, edges 1,768,515
- [large](https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_large.setup.cypher.gz) - vertices 1,632,803, edges 30,622,564.
Dataset is imported as a CYPHERL file of Cypher queries. Feel free to check dataset links for complete Cypher queries.
Once the script is started, a single index is configured on (:User{id}). Only then are queries executed.
Index queries for each supported vendor can be downloaded from “https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/vendor_name.cypher”, just make sure to use the proper vendor name such as `memgraph.cypher`.
### Query list
| |Name | Group | Query |
|-|-----| -- | ------------ |
|Q1|aggregate | aggregate | MATCH (n:User) RETURN n.age, COUNT(*)|
|Q2|aggregate_count | aggregate | MATCH (n) RETURN count(n), count(n.age)|
|Q3|aggregate_with_filter | aggregate | MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)|
|Q4|min_max_avg | aggregate | MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)|
|Q5|expansion_1 | analytical | MATCH (s:User {id: $id})-->(n:User) RETURN n.id|
|Q6|expansion_1_with_filter| analytical | MATCH (s:User {id: $id})-->(n:User) WHERE n.age >= 18 RETURN n.id|
|Q7|expansion_2| analytical | MATCH (s:User {id: $id})-->()-->(n:User) RETURN DISTINCT n.id|
|Q8|expansion_2_with_filter| analytical | MATCH (s:User {id: $id})-->()-->(n:User) WHERE n.age >= 18 RETURN DISTINCT n.id|
|Q9|expansion_3| analytical | MATCH (s:User {id: $id})-->()-->()-->(n:User) RETURN DISTINCT n.id|
|Q10|expansion_3_with_filter| analytical | MATCH (s:User {id: $id})-->()-->()-->(n:User) WHERE n.age >= 18 RETURN DISTINCT n.id|
|Q11|expansion_4| analytical | MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) RETURN DISTINCT n.id|
|Q12|expansion_4_with_filter| analytical | MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) WHERE n.age >= 18 RETURN DISTINCT n.id|
|Q13|neighbours_2| analytical | MATCH (s:User {id: $id})-[*1..2]->(n:User) RETURN DISTINCT n.id|
|Q14|neighbours_2_with_filter| analytical | MATCH (s:User {id: $id})-[*1..2]->(n:User) WHERE n.age >= 18 RETURN DISTINCT n.id|
|Q15|neighbours_2_with_data| analytical | MATCH (s:User {id: $id})-[*1..2]->(n:User) RETURN DISTINCT n.id, n|
|Q16|neighbours_2_with_data_and_filter| analytical | MATCH (s:User {id: $id})-[*1..2]->(n:User) WHERE n.age >= 18 RETURN DISTINCT n.id, n|
|Q17|pattern_cycle| analytical | MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) RETURN e1, m, e2|
|Q18|pattern_long| analytical | MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->(n3)-[e3]->(n4)<-[e4]-(n5) RETURN n5 LIMIT 1|
|Q19|pattern_short| analytical | MATCH (n:User {id: $id})-[e]->(m) RETURN m LIMIT 1|
|Q20|single_edge_write| write | MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m CREATE (n)-[e:Temp]->(m) RETURN e|
|Q21|single_vertex_write| write |CREATE (n:UserTemp {id : $id}) RETURN n|
|Q22|single_vertex_property_update| update | MATCH (n:User {id: $id})-[e]->(m) RETURN m LIMIT 1|
|Q23|single_vertex_read| read | MATCH (n:User {id : $id}) RETURN n|
## Platform
Testing on different hardware platforms and cloudVMs is essential for validating benchmark results. Currently, the tests are run on two different platforms.
### Intel - HP
- Server: HP DL360 G6
- CPU: 2 x Intel Xeon X5650 6C12T @ 2.67GHz
- RAM: 144GB
- OS: Debian 4.19
## Supported databases
Due to current [database compatibility](link) requirements, the only supported database systems at the moment are:
1. Memgraph v2.4
2. Neo4j Community Edition v5.1.
Feel free to contribute and add more databases.
### Database notes
Running configurations that differ from default configuration:
- Memgraph - `storage_snapshot_on_exit=true`, `storage_recover_on_startup=true`
- Neo4j - `dbms.security.auth_enabled=false`
## History and Future of mgBench
### History of mgBench
Infrastructure around mgBench was developed to test and maintain Memgraph performance. When critical code is changed, a performance test is run on Memgraphs CI/CD infrastructure to ensure performance is not impacted. Due to the usage of mgBench for internal testing, some parts of the code are still tightly connected to Memgraphs CI/CD infrastructure. The remains of that code do not impact benchmark setup or performance in any way.
## Future of mgBench
We have big plans for mgBench infrastructure that refers to the above mentioned [limitations](). Even though a basic dataset can give a solid indication of performance, adding bigger and more complex datasets is a priority to enable the execution of complex analytical queries.
Also high on the list is expanding the list of vendors and providing support for different protocols and languages. The goal is to use mgBench to see how well Memgraph performs on various benchmarks tasks and publicly commit to improving.
mgBench is currently a passive benchmark since resource usage and saturation across execution are not tracked. Sanity checks were performed, but these values are needed to get the full picture after each test. mgBench also deserves its own repository, and it will be decupled from Memgraphs testing infrastructure.

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# Copyright 2021 Memgraph Ltd.
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -17,66 +17,20 @@ import copy
import fnmatch
import inspect
import json
import math
import multiprocessing
import random
import statistics
import sys
import datasets
import log
import helpers
import log
import runners
WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization"
WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization"
def get_queries(gen, count):
# Make the generator deterministic.
random.seed(gen.__name__)
# Generate queries.
ret = []
for i in range(count):
ret.append(gen())
return ret
def match_patterns(dataset, variant, group, test, is_default_variant, patterns):
for pattern in patterns:
verdict = [fnmatch.fnmatchcase(dataset, pattern[0])]
if pattern[1] != "":
verdict.append(fnmatch.fnmatchcase(variant, pattern[1]))
else:
verdict.append(is_default_variant)
verdict.append(fnmatch.fnmatchcase(group, pattern[2]))
verdict.append(fnmatch.fnmatchcase(test, pattern[3]))
if all(verdict):
return True
return False
def filter_benchmarks(generators, patterns):
patterns = copy.deepcopy(patterns)
for i in range(len(patterns)):
pattern = patterns[i].split("/")
if len(pattern) > 4 or len(pattern) == 0:
raise Exception("Invalid benchmark description '" + pattern + "'!")
pattern.extend(["", "*", "*"][len(pattern) - 1 :])
patterns[i] = pattern
filtered = []
for dataset in sorted(generators.keys()):
generator, tests = generators[dataset]
for variant in generator.VARIANTS:
is_default_variant = variant == generator.DEFAULT_VARIANT
current = collections.defaultdict(list)
for group in tests:
for test_name, test_func in tests[group]:
if match_patterns(dataset, variant, group, test_name, is_default_variant, patterns):
current[group].append((test_name, test_func))
if len(current) > 0:
filtered.append((generator(variant), dict(current)))
return filtered
# Parse options.
parser = argparse.ArgumentParser(
description="Memgraph benchmark executor.",
@ -89,22 +43,28 @@ parser.add_argument(
help="descriptions of benchmarks that should be run; "
"multiple descriptions can be specified to run multiple "
"benchmarks; the description is specified as "
"dataset/variant/group/test; Unix shell-style wildcards "
"can be used in the descriptions; variant, group and test "
"dataset/variant/group/query; Unix shell-style wildcards "
"can be used in the descriptions; variant, group and query "
"are optional and they can be left out; the default "
"variant is '' which selects the default dataset variant; "
"the default group is '*' which selects all groups; the "
"default test is '*' which selects all tests",
"the default group is '*' which selects all groups; the"
"default query is '*' which selects all queries",
)
parser.add_argument(
"--memgraph-binary",
"--vendor-binary",
help="Vendor binary used for benchmarking, by defuault it is memgraph",
default=helpers.get_binary_path("memgraph"),
help="Memgraph binary used for benchmarking",
)
parser.add_argument(
"--vendor-name",
default="memgraph",
help="Input vendor binary name (memgraph, neo4j)",
)
parser.add_argument(
"--client-binary",
default=helpers.get_binary_path("tests/mgbench/client"),
help="client binary used for benchmarking",
help="Client binary used for benchmarking",
)
parser.add_argument(
"--num-workers-for-import",
@ -122,7 +82,7 @@ parser.add_argument(
"--single-threaded-runtime-sec",
type=int,
default=10,
help="single threaded duration of each test",
help="single threaded duration of each query",
)
parser.add_argument(
"--no-load-query-counts",
@ -145,9 +105,368 @@ parser.add_argument(
help="directory path where temporary data should " "be stored",
)
parser.add_argument("--no-properties-on-edges", action="store_true", help="disable properties on edges")
parser.add_argument("--bolt-port", default=7687, help="memgraph bolt port")
parser.add_argument(
"--no-authorization",
action="store_false",
default=True,
help="Run each query with authorization",
)
parser.add_argument(
"--warmup-run",
action="store_true",
default=False,
help="Run warmup before benchmarks",
)
parser.add_argument(
"--mixed-workload",
nargs="*",
type=int,
default=[],
help="""Define combination that defines the mixed workload.
Mixed workload can be run as a single configuration for all groups of queries,
Pass the positional arguments as values of what percentage of
write/read/update/analytical queries you want to have in your workload.
Example: --mixed-workload 1000 20 70 10 0 will execute 1000 queries, 20% write,
70% read, 10% update and 0% analytical.
Mixed workload can also be run on each query under some defined load.
By passing one more positional argument, you are defining what percentage of that query
will be in mixed workload, and this is executed for each query. The rest of the queries will be
selected from the appropriate groups
Running --mixed-workload 1000 30 0 0 0 70, will execute each query 700 times or 70%,
with the presence of 300 write queries from write type or 30%""",
)
parser.add_argument("--tail-latency", type=int, default=100, help="Number of queries for the tail latency statistics")
parser.add_argument(
"--performance-tracking",
action="store_true",
default=False,
help="Flag for runners performance tracking, this logs RES through time and vendor specific performance tracking.",
)
args = parser.parse_args()
class Workload:
def __init__(self, config):
config_len = len(config)
if config_len == 0:
self.name = "Isolated"
self.config = config
elif config_len >= 5:
if sum(config[1:]) != 100:
raise Exception(
"Please make sure that passed arguments % sum to 100% percent!, passed: ",
config,
)
if config_len == 5:
self.name = "Realistic"
self.config = config
else:
self.name = "Mixed"
self.config = config
def get_queries(gen, count):
# Make the generator deterministic.
random.seed(gen.__name__)
# Generate queries.
ret = []
for i in range(count):
ret.append(gen())
return ret
def match_patterns(dataset, variant, group, query, is_default_variant, patterns):
for pattern in patterns:
verdict = [fnmatch.fnmatchcase(dataset, pattern[0])]
if pattern[1] != "":
verdict.append(fnmatch.fnmatchcase(variant, pattern[1]))
else:
verdict.append(is_default_variant)
verdict.append(fnmatch.fnmatchcase(group, pattern[2]))
verdict.append(fnmatch.fnmatchcase(query, pattern[3]))
if all(verdict):
return True
return False
def filter_benchmarks(generators, patterns):
patterns = copy.deepcopy(patterns)
for i in range(len(patterns)):
pattern = patterns[i].split("/")
if len(pattern) > 5 or len(pattern) == 0:
raise Exception("Invalid benchmark description '" + pattern + "'!")
pattern.extend(["", "*", "*"][len(pattern) - 1 :])
patterns[i] = pattern
filtered = []
for dataset in sorted(generators.keys()):
generator, queries = generators[dataset]
for variant in generator.VARIANTS:
is_default_variant = variant == generator.DEFAULT_VARIANT
current = collections.defaultdict(list)
for group in queries:
for query_name, query_func in queries[group]:
if match_patterns(
dataset,
variant,
group,
query_name,
is_default_variant,
patterns,
):
current[group].append((query_name, query_func))
if len(current) > 0:
filtered.append((generator(variant, args.vendor_name), dict(current)))
return filtered
def warmup(client):
print("Executing warm-up queries")
client.execute(
queries=[
("CREATE ();", {}),
("CREATE ()-[:TempEdge]->();", {}),
("MATCH (n) RETURN n LIMIT 1;", {}),
],
num_workers=1,
)
def tail_latency(vendor, client, func):
vendor.start_benchmark("tail_latency")
if args.warmup_run:
warmup(client)
latency = []
iteration = args.tail_latency
query_list = get_queries(func, iteration)
for i in range(0, iteration):
ret = client.execute(queries=[query_list[i]], num_workers=1)
latency.append(ret[0]["duration"])
latency.sort()
query_stats = {
"iterations": iteration,
"min": latency[0],
"max": latency[iteration - 1],
"mean": statistics.mean(latency),
"p99": latency[math.floor(iteration * 0.99) - 1],
"p95": latency[math.floor(iteration * 0.95) - 1],
"p90": latency[math.floor(iteration * 0.90) - 1],
"p75": latency[math.floor(iteration * 0.75) - 1],
"p50": latency[math.floor(iteration * 0.50) - 1],
}
print("Query statistics for tail latency: ")
print(query_stats)
vendor.stop("tail_latency")
return query_stats
def mixed_workload(vendor, client, dataset, group, queries, workload):
num_of_queries = workload.config[0]
percentage_distribution = workload.config[1:]
if sum(percentage_distribution) != 100:
raise Exception(
"Please make sure that passed arguments % sum to 100% percent!, passed: ",
percentage_distribution,
)
s = [str(i) for i in workload.config]
config_distribution = "_".join(s)
print("Generating mixed workload.")
percentages_by_type = {
"write": percentage_distribution[0],
"read": percentage_distribution[1],
"update": percentage_distribution[2],
"analytical": percentage_distribution[3],
}
queries_by_type = {
"write": [],
"read": [],
"update": [],
"analytical": [],
}
for (_, funcname) in queries[group]:
for key in queries_by_type.keys():
if key in funcname:
queries_by_type[key].append(funcname)
for key, percentage in percentages_by_type.items():
if percentage != 0 and len(queries_by_type[key]) == 0:
raise Exception(
"There is a missing query in group (write, read, update or analytical) for given workload distribution."
)
random.seed(config_distribution)
# Executing mixed workload for each test
if workload.name == "Mixed":
for query, funcname in queries[group]:
full_workload = []
log.info(
"Running query in mixed workload:",
"{}/{}/{}".format(
group,
query,
funcname,
),
)
base_query = getattr(dataset, funcname)
base_query_type = funcname.rsplit("_", 1)[1]
if percentages_by_type.get(base_query_type, 0) > 0:
continue
options = ["write", "read", "update", "analytical", "query"]
function_type = random.choices(population=options, weights=percentage_distribution, k=num_of_queries)
for t in function_type:
# Get the apropropriate functions with same probabilty
if t == "query":
full_workload.append(base_query())
else:
funcname = random.choices(queries_by_type[t], k=1)[0]
aditional_query = getattr(dataset, funcname)
full_workload.append(aditional_query())
vendor.start_benchmark(
dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution
)
if args.warmup_run:
warmup(client)
ret = client.execute(
queries=full_workload,
num_workers=args.num_workers_for_benchmark,
)[0]
usage_workload = vendor.stop(
dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution
)
ret["database"] = usage_workload
results_key = [
dataset.NAME,
dataset.get_variant(),
group,
query + "_" + config_distribution,
WITHOUT_FINE_GRAINED_AUTHORIZATION,
]
results.set_value(*results_key, value=ret)
else:
# Executing mixed workload from groups of queries
full_workload = []
options = ["write", "read", "update", "analytical"]
function_type = random.choices(population=options, weights=percentage_distribution, k=num_of_queries)
for t in function_type:
# Get the apropropriate functions with same probabilty
funcname = random.choices(queries_by_type[t], k=1)[0]
aditional_query = getattr(dataset, funcname)
full_workload.append(aditional_query())
vendor.start_benchmark(dataset.NAME + dataset.get_variant() + "_" + workload.name + "_" + config_distribution)
if args.warmup_run:
warmup(client)
ret = client.execute(
queries=full_workload,
num_workers=args.num_workers_for_benchmark,
)[0]
usage_workload = vendor.stop(
dataset.NAME + dataset.get_variant() + "_" + workload.name + "_" + config_distribution
)
mixed_workload = {
"count": ret["count"],
"duration": ret["duration"],
"retries": ret["retries"],
"throughput": ret["throughput"],
"num_workers": ret["num_workers"],
"database": usage_workload,
}
results_key = [
dataset.NAME,
dataset.get_variant(),
group,
config_distribution,
WITHOUT_FINE_GRAINED_AUTHORIZATION,
]
results.set_value(*results_key, value=mixed_workload)
print(mixed_workload)
def get_query_cache_count(vendor, client, func, config_key):
cached_count = config.get_value(*config_key)
if cached_count is None:
print(
"Determining the number of queries necessary for",
args.single_threaded_runtime_sec,
"seconds of single-threaded runtime...",
)
# First run to prime the query caches.
vendor.start_benchmark("cache")
if args.warmup_run:
warmup(client)
client.execute(queries=get_queries(func, 1), num_workers=1)
# Get a sense of the runtime.
count = 1
while True:
ret = client.execute(queries=get_queries(func, count), num_workers=1)
duration = ret[0]["duration"]
should_execute = int(args.single_threaded_runtime_sec / (duration / count))
print(
"executed_queries={}, total_duration={}, "
"query_duration={}, estimated_count={}".format(count, duration, duration / count, should_execute)
)
# We don't have to execute the next iteration when
# `should_execute` becomes the same order of magnitude as
# `count * 10`.
if should_execute / (count * 10) < 10:
count = should_execute
break
else:
count = count * 10
vendor.stop("cache")
# Lower bound for count
if count < 20:
count = 20
config.set_value(
*config_key,
value={
"count": count,
"duration": args.single_threaded_runtime_sec,
},
)
else:
print(
"Using cached query count of",
cached_count["count"],
"queries for",
cached_count["duration"],
"seconds of single-threaded runtime.",
)
count = int(cached_count["count"] * args.single_threaded_runtime_sec / cached_count["duration"])
return count
# Testing pre commit.
# Detect available datasets.
generators = {}
for key in dir(datasets):
@ -156,13 +475,13 @@ for key in dir(datasets):
dataset = getattr(datasets, key)
if not inspect.isclass(dataset) or dataset == datasets.Dataset or not issubclass(dataset, datasets.Dataset):
continue
tests = collections.defaultdict(list)
queries = collections.defaultdict(list)
for funcname in dir(dataset):
if not funcname.startswith("benchmark__"):
continue
group, test = funcname.split("__")[1:]
tests[group].append((test, funcname))
generators[dataset.NAME] = (dataset, dict(tests))
group, query = funcname.split("__")[1:]
queries[group].append((query, funcname))
generators[dataset.NAME] = (dataset, dict(queries))
if dataset.PROPERTIES_ON_EDGES and args.no_properties_on_edges:
raise Exception(
'The "{}" dataset requires properties on edges, ' "but you have disabled them!".format(dataset.NAME)
@ -170,19 +489,19 @@ for key in dir(datasets):
# List datasets if there is no specified dataset.
if len(args.benchmarks) == 0:
log.init("Available tests")
log.init("Available queries")
for name in sorted(generators.keys()):
print("Dataset:", name)
dataset, tests = generators[name]
dataset, queries = generators[name]
print(
" Variants:",
", ".join(dataset.VARIANTS),
"(default: " + dataset.DEFAULT_VARIANT + ")",
)
for group in sorted(tests.keys()):
for group in sorted(queries.keys()):
print(" Group:", group)
for test_name, test_func in tests[group]:
print(" Test:", test_name)
for query_name, query_func in queries[group]:
print(" Query:", query_name)
sys.exit(0)
# Create cache, config and results objects.
@ -196,133 +515,121 @@ results = helpers.RecursiveDict()
# Filter out the generators.
benchmarks = filter_benchmarks(generators, args.benchmarks)
# Run all specified benchmarks.
for dataset, tests in benchmarks:
for dataset, queries in benchmarks:
workload = Workload(args.mixed_workload)
run_config = {
"vendor": args.vendor_name,
"condition": "hot" if args.warmup_run else "cold",
"workload": workload.name,
"workload_config": workload.config,
}
results.set_value("__run_configuration__", value=run_config)
log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(), "dataset")
dataset.prepare(cache.cache_directory("datasets", dataset.NAME, dataset.get_variant()))
# Prepare runners and import the dataset.
memgraph = runners.Memgraph(
args.memgraph_binary,
args.temporary_directory,
not args.no_properties_on_edges,
args.bolt_port,
)
client = runners.Client(args.client_binary, args.temporary_directory, args.bolt_port)
memgraph.start_preparation()
ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
usage = memgraph.stop()
# Display import statistics.
print()
for row in ret:
print(
"Executed",
row["count"],
"queries in",
row["duration"],
"seconds using",
row["num_workers"],
"workers with a total throughput of",
row["throughput"],
"queries/second.",
# TODO: Create some abstract class for vendors, that will hold this data
if args.vendor_name == "neo4j":
vendor = runners.Neo4j(
args.vendor_binary,
args.temporary_directory,
args.bolt_port,
args.performance_tracking,
)
else:
vendor = runners.Memgraph(
args.vendor_binary,
args.temporary_directory,
not args.no_properties_on_edges,
args.bolt_port,
args.performance_tracking,
)
print()
print(
"The database used",
usage["cpu"],
"seconds of CPU time and peaked at",
usage["memory"] / 1024 / 1024,
"MiB of RAM.",
)
# Save import results.
client = runners.Client(args.client_binary, args.temporary_directory, args.bolt_port)
ret = None
usage = None
if args.vendor_name == "neo4j":
vendor.start_preparation("preparation")
print("Executing database cleanup and index setup...")
ret = client.execute(file_path=dataset.get_index(), num_workers=args.num_workers_for_import)
usage = vendor.stop("preparation")
dump_dir = cache.cache_directory("datasets", dataset.NAME, dataset.get_variant())
dump_file, exists = dump_dir.get_file("neo4j.dump")
if exists:
vendor.load_db_from_dump(path=dump_dir.get_path())
else:
vendor.start_preparation("import")
print("Importing dataset...")
ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
usage = vendor.stop("import")
vendor.dump_db(path=dump_dir.get_path())
else:
vendor.start_preparation("import")
print("Executing database cleanup and index setup...")
ret = client.execute(file_path=dataset.get_index(), num_workers=args.num_workers_for_import)
print("Importing dataset...")
ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
usage = vendor.stop("import")
# Save import results.
import_key = [dataset.NAME, dataset.get_variant(), "__import__"]
results.set_value(*import_key, value={"client": ret, "database": usage})
if ret != None and usage != None:
# Display import statistics.
print()
for row in ret:
print(
"Executed",
row["count"],
"queries in",
row["duration"],
"seconds using",
row["num_workers"],
"workers with a total throughput of",
row["throughput"],
"queries/second.",
)
print()
print(
"The database used",
usage["cpu"],
"seconds of CPU time and peaked at",
usage["memory"] / 1024 / 1024,
"MiB of RAM.",
)
# TODO: cache import data
results.set_value(*import_key, value={"client": ret, "database": usage})
else:
results.set_value(*import_key, value={"client": "dump_load", "database": "dump_load"})
# Run all benchmarks in all available groups.
for group in sorted(queries.keys()):
for with_fine_grained_authorization in [False, True]:
if with_fine_grained_authorization:
memgraph.start_preparation()
client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
client.execute(
queries=[
("CREATE USER user IDENTIFIED BY 'test';", {}),
("GRANT ALL PRIVILEGES TO user;", {}),
("GRANT CREATE_DELETE ON EDGE_TYPES * TO user;", {}),
("GRANT CREATE_DELETE ON LABELS * TO user;", {}),
]
)
client = runners.Client(
args.client_binary,
args.temporary_directory,
args.bolt_port,
username="user",
password="test",
)
memgraph.stop()
test_type = (
WITH_FINE_GRAINED_AUTHORIZATION if with_fine_grained_authorization else WITHOUT_FINE_GRAINED_AUTHORIZATION
)
for group in sorted(tests.keys()):
for test, funcname in tests[group]:
log.info("Running test:", "{}/{}/{}".format(group, test, test_type))
# Running queries in mixed workload
if workload.name == "Mixed" or workload.name == "Realistic":
mixed_workload(vendor, client, dataset, group, queries, workload)
else:
for query, funcname in queries[group]:
log.info(
"Running query:",
"{}/{}/{}/{}".format(group, query, funcname, WITHOUT_FINE_GRAINED_AUTHORIZATION),
)
func = getattr(dataset, funcname)
# Get number of queries to execute.
# TODO: implement minimum number of queries, `max(10, num_workers)`
config_key = [dataset.NAME, dataset.get_variant(), group, test, test_type]
cached_count = config.get_value(*config_key)
if cached_count is None:
print(
"Determining the number of queries necessary for",
args.single_threaded_runtime_sec,
"seconds of single-threaded runtime...",
)
# First run to prime the query caches.
memgraph.start_benchmark()
client.execute(queries=get_queries(func, 1), num_workers=1)
# Get a sense of the runtime.
count = 1
while True:
ret = client.execute(queries=get_queries(func, count), num_workers=1)
duration = ret[0]["duration"]
should_execute = int(args.single_threaded_runtime_sec / (duration / count))
print(
"executed_queries={}, total_duration={}, "
"query_duration={}, estimated_count={}".format(
count, duration, duration / count, should_execute
)
)
# We don't have to execute the next iteration when
# `should_execute` becomes the same order of magnitude as
# `count * 10`.
if should_execute / (count * 10) < 10:
count = should_execute
break
else:
count = count * 10
memgraph.stop()
config.set_value(
*config_key,
value={
"count": count,
"duration": args.single_threaded_runtime_sec,
},
)
else:
print(
"Using cached query count of",
cached_count["count"],
"queries for",
cached_count["duration"],
"seconds of single-threaded runtime.",
)
count = int(cached_count["count"] * args.single_threaded_runtime_sec / cached_count["duration"])
query_statistics = tail_latency(vendor, client, func)
# Query count for each vendor
config_key = [
dataset.NAME,
dataset.get_variant(),
args.vendor_name,
group,
query,
]
count = get_query_cache_count(vendor, client, func, config_key)
# Benchmark run.
print("Sample query:", get_queries(func, 1)[0][0])
@ -338,13 +645,16 @@ for dataset, tests in benchmarks:
args.num_workers_for_benchmark,
"concurrent clients.",
)
memgraph.start_benchmark()
vendor.start_benchmark(dataset.NAME + dataset.get_variant() + "_" + workload.name + "_" + query)
if args.warmup_run:
warmup(client)
ret = client.execute(
queries=get_queries(func, count),
num_workers=args.num_workers_for_benchmark,
)[0]
usage = memgraph.stop()
usage = vendor.stop(dataset.NAME + dataset.get_variant() + "_" + workload.name + "_" + query)
ret["database"] = usage
ret["query_statistics"] = query_statistics
# Output summary.
print()
@ -362,9 +672,108 @@ for dataset, tests in benchmarks:
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
# Save results.
results_key = [dataset.NAME, dataset.get_variant(), group, test, test_type]
results_key = [
dataset.NAME,
dataset.get_variant(),
group,
query,
WITHOUT_FINE_GRAINED_AUTHORIZATION,
]
results.set_value(*results_key, value=ret)
## If there is need for authorization testing.
if args.no_authorization:
print("Running query with authorization")
vendor.start_benchmark("authorization")
client.execute(
queries=[
("CREATE USER user IDENTIFIED BY 'test';", {}),
("GRANT ALL PRIVILEGES TO user;", {}),
("GRANT CREATE_DELETE ON EDGE_TYPES * TO user;", {}),
("GRANT CREATE_DELETE ON LABELS * TO user;", {}),
]
)
client = runners.Client(
args.client_binary,
args.temporary_directory,
args.bolt_port,
username="user",
password="test",
)
vendor.stop("authorization")
for query, funcname in queries[group]:
log.info(
"Running query:",
"{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION),
)
func = getattr(dataset, funcname)
query_statistics = tail_latency(vendor, client, func)
config_key = [
dataset.NAME,
dataset.get_variant(),
args.vendor_name,
group,
query,
]
count = get_query_cache_count(vendor, client, func, config_key)
vendor.start_benchmark("authorization")
if args.warmup_run:
warmup(client)
ret = client.execute(
queries=get_queries(func, count),
num_workers=args.num_workers_for_benchmark,
)[0]
usage = vendor.stop("authorization")
ret["database"] = usage
ret["query_statistics"] = query_statistics
# Output summary.
print()
print(
"Executed",
ret["count"],
"queries in",
ret["duration"],
"seconds.",
)
print("Queries have been retried", ret["retries"], "times.")
print("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
print("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max"))
metadata = ret["metadata"]
for key in sorted(metadata.keys()):
print(
"{name:>30}: {minimum:>20.06f} {average:>20.06f} "
"{maximum:>20.06f}".format(name=key, **metadata[key])
)
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
# Save results.
results_key = [
dataset.NAME,
dataset.get_variant(),
group,
query,
WITH_FINE_GRAINED_AUTHORIZATION,
]
results.set_value(*results_key, value=ret)
# Clean up database from any roles and users job
vendor.start_benchmark("authorizations")
ret = client.execute(
queries=[
("REVOKE LABELS * FROM user;", {}),
("REVOKE EDGE_TYPES * FROM user;", {}),
("DROP USER user;", {}),
]
)
vendor.stop("authorization")
# Save configuration.
if not args.no_save_query_counts:
cache.save_config(config)

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# Copyright 2021 Memgraph Ltd.
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -15,48 +15,6 @@ import argparse
import json
FIELDS = [
{
"name": "throughput",
"positive_diff_better": True,
"scaling": 1,
"unit": "QPS",
"diff_treshold": 0.05, # 5%
},
{
"name": "duration",
"positive_diff_better": False,
"scaling": 1,
"unit": "s",
},
{
"name": "parsing_time",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "planning_time",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "plan_execution_time",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "memory",
"positive_diff_better": False,
"scaling": 1 / 1024 / 1024,
"unit": "MiB",
"diff_treshold": 0.02, # 2%
},
]
def load_results(fname):
with open(fname) as f:
return json.load(f)
@ -77,9 +35,11 @@ def recursive_get(data, *args, value=None):
return data
def compare_results(results_from, results_to, fields, ignored):
def compare_results(results_from, results_to, fields, ignored, different_vendors):
ret = {}
for dataset, variants in results_to.items():
if dataset == "__run_configuration__":
continue
for variant, groups in variants.items():
for group, scenarios in groups.items():
if group == "__import__":
@ -89,9 +49,11 @@ def compare_results(results_from, results_to, fields, ignored):
continue
summary_from = recursive_get(results_from, dataset, variant, group, scenario, value={})
summary_from = summary_from["without_fine_grained_authorization"]
summary_to = summary_to["without_fine_grained_authorization"]
if (
len(summary_from) > 0
and summary_to["count"] != summary_from["count"]
and (summary_to["count"] != summary_from["count"] and not different_vendors)
or summary_to["num_workers"] != summary_from["num_workers"]
):
raise Exception("Incompatible results!")
@ -115,13 +77,19 @@ def compare_results(results_from, results_to, fields, ignored):
recursive_get(summary_from, "database", key, value=None),
summary_to["database"][key],
)
else:
elif summary_to.get("query_statistics") != None and key in summary_to["query_statistics"]:
row[key] = compute_diff(
recursive_get(summary_from, "query_statistics", key, value=None),
summary_to["query_statistics"][key],
)
elif not different_vendors:
row[key] = compute_diff(
recursive_get(summary_from, "metadata", key, "average", value=None),
summary_to["metadata"][key]["average"],
)
if "diff" not in row[key] or (
"diff_treshold" in field and abs(row[key]["diff"]) >= field["diff_treshold"]
if row.get(key) != None and (
"diff" not in row[key]
or ("diff_treshold" in field and abs(row[key]["diff"]) >= field["diff_treshold"])
):
performance_changed = True
if performance_changed:
@ -149,19 +117,22 @@ def generate_remarkup(fields, data):
ret += " <tr>\n"
ret += " <td>{}</td>\n".format(testcode)
for field in fields:
result = data[testcode][field["name"]]
value = result["value"] * field["scaling"]
if "diff" in result:
diff = result["diff"]
arrow = "arrow-up" if diff >= 0 else "arrow-down"
if not (field["positive_diff_better"] ^ (diff >= 0)):
color = "green"
result = data[testcode].get(field["name"])
if result != None:
value = result["value"] * field["scaling"]
if "diff" in result:
diff = result["diff"]
arrow = "arrow-up" if diff >= 0 else "arrow-down"
if not (field["positive_diff_better"] ^ (diff >= 0)):
color = "green"
else:
color = "red"
sign = "{{icon {} color={}}}".format(arrow, color)
ret += ' <td bgcolor="{}">{:.3f}{} ({:+.2%})</td>\n'.format(
color, value, field["unit"], diff
)
else:
color = "red"
sign = "{{icon {} color={}}}".format(arrow, color)
ret += ' <td bgcolor="{}">{:.3f}{} ({:+.2%})</td>\n'.format(color, value, field["unit"], diff)
else:
ret += '<td bgcolor="blue">{:.3f}{} //(new)// </td>\n'.format(value, field["unit"])
ret += '<td bgcolor="blue">{:.3f}{} //(new)// </td>\n'.format(value, field["unit"])
ret += " </tr>\n"
ret += "</table>\n"
else:
@ -181,8 +152,96 @@ if __name__ == "__main__":
parser.add_argument("--output", default="", help="output file name")
# file is read line by line, each representing one test name
parser.add_argument("--exclude_tests_file", help="file listing test names to be excluded")
parser.add_argument(
"--different-vendors",
action="store_true",
default=False,
help="Comparing different vendors, there is no need for metadata, duration, count check.",
)
parser.add_argument(
"--difference-threshold", type=float, help="Difference threshold for memory and throughput, 0.02 = 2% "
)
args = parser.parse_args()
fields = [
{
"name": "throughput",
"positive_diff_better": True,
"scaling": 1,
"unit": "QPS",
"diff_treshold": 0.05, # 5%
},
{
"name": "duration",
"positive_diff_better": False,
"scaling": 1,
"unit": "s",
},
{
"name": "parsing_time",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "planning_time",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "plan_execution_time",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "memory",
"positive_diff_better": False,
"scaling": 1 / 1024 / 1024,
"unit": "MiB",
"diff_treshold": 0.02, # 2%
},
{
"name": "max",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "p99",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "p90",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "p75",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "p50",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
{
"name": "mean",
"positive_diff_better": False,
"scaling": 1000,
"unit": "ms",
},
]
if args.compare is None or len(args.compare) == 0:
raise Exception("You must specify at least one pair of files!")
@ -192,13 +251,29 @@ if __name__ == "__main__":
else:
ignored = []
cleaned = []
if args.different_vendors:
ignore_on_different_vendors = {"duration", "parsing_time", "planning_time", "plan_execution_time"}
for field in fields:
key = field["name"]
if key in ignore_on_different_vendors:
continue
else:
cleaned.append(field)
fields = cleaned
if args.difference_threshold > 0.01:
for field in fields:
if "diff_treshold" in field.keys():
field["diff_treshold"] = args.difference_threshold
data = {}
for file_from, file_to in args.compare:
results_from = load_results(file_from)
results_to = load_results(file_to)
data.update(compare_results(results_from, results_to, FIELDS, ignored))
data.update(compare_results(results_from, results_to, fields, ignored, args.different_vendors))
remarkup = generate_remarkup(FIELDS, data)
remarkup = generate_remarkup(fields, data)
if args.output:
with open(args.output, "w") as f:
f.write(remarkup)

View File

@ -1,4 +1,4 @@
# Copyright 2021 Memgraph Ltd.
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -27,6 +27,8 @@ class Dataset:
FILES = {
"default": "/foo/bar",
}
INDEX = None
INDEX_FILES = {"default": ""}
# List of query file URLs that should be used to import the dataset.
URLS = None
# Number of vertices/edges for each variant.
@ -36,7 +38,7 @@ class Dataset:
# Indicates whether the dataset has properties on edges.
PROPERTIES_ON_EDGES = False
def __init__(self, variant=None):
def __init__(self, variant=None, vendor=None):
"""
Accepts a `variant` variable that indicates which variant
of the dataset should be executed.
@ -49,7 +51,10 @@ class Dataset:
raise ValueError("The variant doesn't have a defined URL or " "file path!")
if variant not in self.SIZES:
raise ValueError("The variant doesn't have a defined dataset " "size!")
if vendor not in self.INDEX_FILES:
raise ValueError("Vendor does not have INDEX for dataset!")
self._variant = variant
self._vendor = vendor
if self.FILES is not None:
self._file = self.FILES.get(variant, None)
else:
@ -58,6 +63,12 @@ class Dataset:
self._url = self.URLS.get(variant, None)
else:
self._url = None
if self.INDEX_FILES is not None:
self._index = self.INDEX_FILES.get(vendor, None)
else:
self._index = None
self._size = self.SIZES[variant]
if "vertices" not in self._size or "edges" not in self._size:
raise ValueError("The size defined for this variant doesn't " "have the number of vertices and/or edges!")
@ -67,21 +78,34 @@ class Dataset:
def prepare(self, directory):
if self._file is not None:
print("Using dataset file:", self._file)
return
# TODO: add support for JSON datasets
cached_input, exists = directory.get_file("dataset.cypher")
else:
# TODO: add support for JSON datasets
cached_input, exists = directory.get_file("dataset.cypher")
if not exists:
print("Downloading dataset file:", self._url)
downloaded_file = helpers.download_file(self._url, directory.get_path())
print("Unpacking and caching file:", downloaded_file)
helpers.unpack_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file:", cached_input)
self._file = cached_input
cached_index, exists = directory.get_file(self._vendor + ".cypher")
if not exists:
print("Downloading dataset file:", self._url)
downloaded_file = helpers.download_file(self._url, directory.get_path())
print("Downloading index file:", self._index)
downloaded_file = helpers.download_file(self._index, directory.get_path())
print("Unpacking and caching file:", downloaded_file)
helpers.unpack_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file:", cached_input)
self._file = cached_input
helpers.unpack_and_move_file(downloaded_file, cached_index)
print("Using cached index file:", cached_index)
self._index = cached_index
def get_variant(self):
"""Returns the current variant of the dataset."""
return self._variant
def get_index(self):
"""Get index file, defined by vendor"""
return self._index
def get_file(self):
"""
Returns path to the file that contains dataset creation queries.
@ -103,16 +127,23 @@ class Pokec(Dataset):
VARIANTS = ["small", "medium", "large"]
DEFAULT_VARIANT = "small"
FILES = None
URLS = {
"small": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_small.setup.cypher",
"medium": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_medium.setup.cypher",
"large": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_large.setup.cypher.gz",
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_small_import.cypher",
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_medium_import.cypher",
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_large.setup.cypher.gz",
}
SIZES = {
"small": {"vertices": 10000, "edges": 121716},
"medium": {"vertices": 100000, "edges": 1768515},
"large": {"vertices": 1632803, "edges": 30622564},
}
INDEX = None
INDEX_FILES = {
"memgraph": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/memgraph.cypher",
"neo4j": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/neo4j.cypher",
}
PROPERTIES_ON_EDGES = False
# Helpers used to generate the queries
@ -135,7 +166,10 @@ class Pokec(Dataset):
return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()})
def benchmark__arango__single_vertex_write(self):
return ("CREATE (n:UserTemp {id : $id}) RETURN n", {"id": random.randint(1, self._num_vertices * 10)})
return (
"CREATE (n:UserTemp {id : $id}) RETURN n",
{"id": random.randint(1, self._num_vertices * 10)},
)
def benchmark__arango__single_edge_write(self):
vertex_from, vertex_to = self._get_random_from_to()
@ -151,7 +185,10 @@ class Pokec(Dataset):
return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {})
def benchmark__arango__expansion_1(self):
return ("MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id", {"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_1_with_filter(self):
return (
@ -160,7 +197,10 @@ class Pokec(Dataset):
)
def benchmark__arango__expansion_2(self):
return ("MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id", {"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_2_with_filter(self):
return (
@ -193,7 +233,10 @@ class Pokec(Dataset):
)
def benchmark__arango__neighbours_2(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id", {"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__neighbours_2_with_filter(self):
return (
@ -270,7 +313,10 @@ class Pokec(Dataset):
return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {})
def benchmark__match__pattern_cycle(self):
return ("MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2", {"id": self._get_random_vertex()})
return (
"MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2",
{"id": self._get_random_vertex()},
)
def benchmark__match__pattern_long(self):
return (
@ -279,10 +325,16 @@ class Pokec(Dataset):
)
def benchmark__match__pattern_short(self):
return ("MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1", {"id": self._get_random_vertex()})
return (
"MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1",
{"id": self._get_random_vertex()},
)
def benchmark__match__vertex_on_label_property(self):
return ("MATCH (n:User) WITH n WHERE n.id = $id RETURN n", {"id": self._get_random_vertex()})
return (
"MATCH (n:User) WITH n WHERE n.id = $id RETURN n",
{"id": self._get_random_vertex()},
)
def benchmark__match__vertex_on_label_property_index(self):
return ("MATCH (n:User {id: $id}) RETURN n", {"id": self._get_random_vertex()})
@ -291,4 +343,133 @@ class Pokec(Dataset):
return ("MATCH (n {id: $id}) RETURN n", {"id": self._get_random_vertex()})
def benchmark__update__vertex_on_property(self):
return ("MATCH (n {id: $id}) SET n.property = -1", {"id": self._get_random_vertex()})
return (
"MATCH (n {id: $id}) SET n.property = -1",
{"id": self._get_random_vertex()},
)
# Basic benchmark queries
def benchmark__basic__single_vertex_read_read(self):
return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()})
def benchmark__basic__single_vertex_write_write(self):
return (
"CREATE (n:UserTemp {id : $id}) RETURN n",
{"id": random.randint(1, self._num_vertices * 10)},
)
def benchmark__basic__single_vertex_property_update_update(self):
return (
"MATCH (n {id: $id}) SET n.property = -1",
{"id": self._get_random_vertex()},
)
def benchmark__basic__single_edge_write_write(self):
vertex_from, vertex_to = self._get_random_from_to()
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "CREATE (n)-[e:Temp]->(m) RETURN e",
{"from": vertex_from, "to": vertex_to},
)
def benchmark__basic__aggregate_aggregate(self):
return ("MATCH (n:User) RETURN n.age, COUNT(*)", {})
def benchmark__basic__aggregate_count_aggregate(self):
return ("MATCH (n) RETURN count(n), count(n.age)", {})
def benchmark__basic__aggregate_with_filter_aggregate(self):
return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {})
def benchmark__basic__min_max_avg_aggregate(self):
return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {})
def benchmark__basic__expansion_1_analytical(self):
return (
"MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__expansion_1_with_filter_analytical(self):
return (
"MATCH (s:User {id: $id})-->(n:User) " "WHERE n.age >= 18 " "RETURN n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__expansion_2_analytical(self):
return (
"MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__expansion_2_with_filter_analytical(self):
return (
"MATCH (s:User {id: $id})-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__expansion_3_analytical(self):
return (
"MATCH (s:User {id: $id})-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__expansion_3_with_filter_analytical(self):
return (
"MATCH (s:User {id: $id})-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__expansion_4_analytical(self):
return (
"MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__expansion_4_with_filter_analytical(self):
return (
"MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__neighbours_2_analytical(self):
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__neighbours_2_with_filter_analytical(self):
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__basic__neighbours_2_with_data_analytical(self):
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()},
)
def benchmark__basic__neighbours_2_with_data_and_filter_analytical(self):
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()},
)
def benchmark__basic__pattern_cycle_analytical(self):
return (
"MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2",
{"id": self._get_random_vertex()},
)
def benchmark__basic__pattern_long_analytical(self):
return (
"MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" "(n3)-[e3]->(n4)<-[e4]-(n5) " "RETURN n5 LIMIT 1",
{"id": self._get_random_vertex()},
)
def benchmark__basic__pattern_short_analytical(self):
return (
"MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1",
{"id": self._get_random_vertex()},
)

View File

@ -0,0 +1,210 @@
import argparse
import json
import subprocess
from pathlib import Path
def parse_arguments():
parser = argparse.ArgumentParser(
description="Run graph database benchmarks on supported databases(Memgraph and Neo4j)",
)
parser.add_argument(
"--vendor",
nargs=2,
action="append",
metavar=("vendor_name", "vendor_binary"),
help="Forward name and paths to vendors binary"
"Example: --vendor memgraph /path/to/binary --vendor neo4j /path/to/binary",
)
parser.add_argument(
"--dataset-size",
default="small",
choices=["small", "medium", "large"],
help="Pick a dataset size (small, medium, large)",
)
parser.add_argument("--dataset-group", default="basic", help="Select a group of queries")
parser.add_argument(
"--realistic",
nargs=5,
action="append",
metavar=("num_of_queries", "write", "read", "update", "analytical"),
help="Forward config for group run",
)
parser.add_argument(
"--mixed",
nargs=6,
action="append",
metavar=(
"num_of_queries",
"write",
"read",
"update",
"analytical",
"query_percentage",
),
help="Forward config for query",
)
args = parser.parse_args()
return args
def run_full_benchmarks(vendor, binary, dataset_size, dataset_group, realistic, mixed):
configurations = [
# Basic full group test cold
[
"--export-results",
vendor + "_" + dataset_size + "_cold_isolated.json",
],
# Basic full group test hot
[
"--export-results",
vendor + "_" + dataset_size + "_hot_isolated.json",
"--warmup-run",
],
]
# Configurations for full workload
for count, write, read, update, analytical in realistic:
cold = [
"--export-results",
vendor
+ "_"
+ dataset_size
+ "_cold_realistic_{}_{}_{}_{}_{}.json".format(count, write, read, update, analytical),
"--mixed-workload",
count,
write,
read,
update,
analytical,
]
hot = [
"--export-results",
vendor
+ "_"
+ dataset_size
+ "_hot_realistic_{}_{}_{}_{}_{}.json".format(count, write, read, update, analytical),
"--warmup-run",
"--mixed-workload",
count,
write,
read,
update,
analytical,
]
configurations.append(cold)
configurations.append(hot)
# Configurations for workload per query
for count, write, read, update, analytical, query in mixed:
cold = [
"--export-results",
vendor
+ "_"
+ dataset_size
+ "_cold_mixed_{}_{}_{}_{}_{}_{}.json".format(count, write, read, update, analytical, query),
"--mixed-workload",
count,
write,
read,
update,
analytical,
query,
]
hot = [
"--export-results",
vendor
+ "_"
+ dataset_size
+ "_hot_mixed_{}_{}_{}_{}_{}_{}.json".format(count, write, read, update, analytical, query),
"--warmup-run",
"--mixed-workload",
count,
write,
read,
update,
analytical,
query,
]
configurations.append(cold)
configurations.append(hot)
default_args = [
"python3",
"benchmark.py",
"--vendor-binary",
binary,
"--vendor-name",
vendor,
"--num-workers-for-benchmark",
"12",
"--no-authorization",
"pokec/" + dataset_size + "/" + dataset_group + "/*",
]
for config in configurations:
full_config = default_args + config
print(full_config)
subprocess.run(args=full_config, check=True)
def collect_all_results(vendor_name, dataset_size, dataset_group):
working_directory = Path().absolute()
print(working_directory)
results = sorted(working_directory.glob(vendor_name + "_" + dataset_size + "_*.json"))
summary = {"pokec": {dataset_size: {dataset_group: {}}}}
for file in results:
if "summary" in file.name:
continue
f = file.open()
data = json.loads(f.read())
if data["__run_configuration__"]["condition"] == "hot":
for key, value in data["pokec"][dataset_size][dataset_group].items():
key_condition = key + "_hot"
summary["pokec"][dataset_size][dataset_group][key_condition] = value
elif data["__run_configuration__"]["condition"] == "cold":
for key, value in data["pokec"][dataset_size][dataset_group].items():
key_condition = key + "_cold"
summary["pokec"][dataset_size][dataset_group][key_condition] = value
print(summary)
json_object = json.dumps(summary, indent=4)
print(json_object)
with open(vendor_name + "_" + dataset_size + "_summary.json", "w") as f:
json.dump(summary, f)
if __name__ == "__main__":
args = parse_arguments()
realistic = args.realistic
mixed = args.mixed
vendor_names = {"memgraph", "neo4j"}
for vendor_name, vendor_binary in args.vendor:
path = Path(vendor_binary)
if vendor_name.lower() in vendor_names and (path.is_file() or path.is_dir()):
run_full_benchmarks(
vendor_name,
vendor_binary,
args.dataset_size,
args.dataset_group,
realistic,
mixed,
)
collect_all_results(vendor_name, args.dataset_size, args.dataset_group)
else:
raise Exception(
"Check that vendor: {} is supported and you are passing right path: {} to binary.".format(
vendor_name, path
)
)

View File

@ -1,4 +1,4 @@
# Copyright 2021 Memgraph Ltd.
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -15,7 +15,9 @@ import os
import re
import subprocess
import tempfile
import threading
import time
from pathlib import Path
def wait_for_server(port, delay=0.1):
@ -50,13 +52,26 @@ def _get_usage(pid):
return {"cpu": total_cpu, "memory": peak_rss}
def _get_current_usage(pid):
rss = 0
with open("/proc/{}/status".format(pid)) as f:
for row in f:
tmp = row.split()
if tmp[0] == "VmRSS:":
rss = int(tmp[1])
return rss / 1024
class Memgraph:
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges, bolt_port):
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges, bolt_port, performance_tracking):
self._memgraph_binary = memgraph_binary
self._directory = tempfile.TemporaryDirectory(dir=temporary_dir)
self._properties_on_edges = properties_on_edges
self._proc_mg = None
self._bolt_port = bolt_port
self.performance_tracking = performance_tracking
self._stop_event = threading.Event()
self._rss = []
atexit.register(self._cleanup)
# Determine Memgraph version
@ -103,25 +118,249 @@ class Memgraph:
self._proc_mg = None
return ret, usage
def start_preparation(self):
if self._memgraph_version >= (0, 50, 0):
self._start(storage_snapshot_on_exit=True)
else:
self._start(snapshot_on_exit=True)
def start_preparation(self, workload):
if self.performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
self._rss.clear()
p.start()
self._start(storage_snapshot_on_exit=True)
def start_benchmark(self):
# TODO: support custom benchmarking config files!
if self._memgraph_version >= (0, 50, 0):
self._start(storage_recover_on_startup=True)
else:
self._start(db_recover_on_startup=True)
def start_benchmark(self, workload):
if self.performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
self._rss.clear()
p.start()
self._start(storage_recover_on_startup=True)
def stop(self):
def res_background_tracking(self, res, stop_event):
print("Started rss tracking.")
while not stop_event.is_set():
if self._proc_mg != None:
self._rss.append(_get_current_usage(self._proc_mg.pid))
time.sleep(0.05)
print("Stopped rss tracking. ")
def dump_rss(self, workload):
file_name = workload + "_rss"
Path.mkdir(Path().cwd() / "memgraph_memory", exist_ok=True)
file = Path(Path().cwd() / "memgraph_memory" / file_name)
file.touch()
with file.open("r+") as f:
for rss in self._rss:
f.write(str(rss))
f.write("\n")
f.close()
def stop(self, workload):
if self.performance_tracking:
self._stop_event.set()
self.dump_rss(workload)
ret, usage = self._cleanup()
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
class Neo4j:
def __init__(self, neo4j_path, temporary_dir, bolt_port, performance_tracking):
self._neo4j_path = Path(neo4j_path)
self._neo4j_binary = Path(neo4j_path) / "bin" / "neo4j"
self._neo4j_config = Path(neo4j_path) / "conf" / "neo4j.conf"
self._neo4j_pid = Path(neo4j_path) / "run" / "neo4j.pid"
self._neo4j_admin = Path(neo4j_path) / "bin" / "neo4j-admin"
self.performance_tracking = performance_tracking
self._stop_event = threading.Event()
self._rss = []
if not self._neo4j_binary.is_file():
raise Exception("Wrong path to binary!")
self._directory = tempfile.TemporaryDirectory(dir=temporary_dir)
self._bolt_port = bolt_port
atexit.register(self._cleanup)
configs = []
memory_flag = "server.jvm.additional=-XX:NativeMemoryTracking=detail"
auth_flag = "dbms.security.auth_enabled=false"
if self.performance_tracking:
configs.append(memory_flag)
else:
lines = []
with self._neo4j_config.open("r") as file:
lines = file.readlines()
file.close()
for i in range(0, len(lines)):
if lines[i].strip("\n") == memory_flag:
print("Clear up config flag: " + memory_flag)
lines[i] = "\n"
print(lines[i])
with self._neo4j_config.open("w") as file:
file.writelines(lines)
file.close()
configs.append(auth_flag)
print("Check neo4j config flags:")
for conf in configs:
with self._neo4j_config.open("r+") as file:
lines = file.readlines()
line_exist = False
for line in lines:
if conf == line.rstrip():
line_exist = True
print("Config line exist at line: " + str(lines.index(line)))
print("Line content: " + line)
file.close()
break
if not line_exist:
print("Setting config line: " + conf)
file.write(conf)
file.write("\n")
file.close()
def __del__(self):
self._cleanup()
atexit.unregister(self._cleanup)
def _start(self, **kwargs):
if self._neo4j_pid.exists():
raise Exception("The database process is already running!")
args = _convert_args_to_flags(self._neo4j_binary, "start", **kwargs)
start_proc = subprocess.run(args, check=True)
time.sleep(5)
if self._neo4j_pid.exists():
print("Neo4j started!")
else:
raise Exception("The database process died prematurely!")
print("Run server check:")
wait_for_server(self._bolt_port)
def _cleanup(self):
if self._neo4j_pid.exists():
pid = self._neo4j_pid.read_text()
print("Clean up: " + pid)
usage = _get_usage(pid)
exit_proc = subprocess.run(args=[self._neo4j_binary, "stop"], capture_output=True, check=True)
return exit_proc.returncode, usage
else:
return 0
def start_preparation(self, workload):
if self.performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
self._rss.clear()
p.start()
# Start DB
self._start()
if self.performance_tracking:
self.get_memory_usage("start_" + workload)
def start_benchmark(self, workload):
if self.performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
self._rss.clear()
p.start()
# Start DB
self._start()
if self.performance_tracking:
self.get_memory_usage("start_" + workload)
def dump_db(self, path):
print("Dumping the neo4j database...")
if self._neo4j_pid.exists():
raise Exception("Cannot dump DB because it is running.")
else:
subprocess.run(
args=[
self._neo4j_admin,
"database",
"dump",
"--overwrite-destination=false",
"--to-path",
path,
"neo4j",
],
check=True,
)
def load_db_from_dump(self, path):
print("Loading the neo4j database from dump...")
if self._neo4j_pid.exists():
raise Exception("Cannot dump DB because it is running.")
else:
subprocess.run(
args=[
self._neo4j_admin,
"database",
"load",
"--from-path=" + path,
"--overwrite-destination=true",
"neo4j",
],
check=True,
)
def res_background_tracking(self, res, stop_event):
print("Started rss tracking.")
while not stop_event.is_set():
if self._neo4j_pid.exists():
pid = self._neo4j_pid.read_text()
self._rss.append(_get_current_usage(pid))
time.sleep(0.05)
print("Stopped rss tracking. ")
def is_stopped(self):
pid_file = self._neo4j_path / "run" / "neo4j.pid"
if pid_file.exists():
return False
else:
return True
def stop(self, workload):
if self.performance_tracking:
self._stop_event.set()
self.get_memory_usage("stop_" + workload)
self.dump_rss(workload)
ret, usage = self._cleanup()
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
def dump_rss(self, workload):
file_name = workload + "_rss"
Path.mkdir(Path().cwd() / "neo4j_memory", exist_ok=True)
file = Path(Path().cwd() / "neo4j_memory" / file_name)
file.touch()
with file.open("r+") as f:
for rss in self._rss:
f.write(str(rss))
f.write("\n")
f.close()
def get_memory_usage(self, workload):
Path.mkdir(Path().cwd() / "neo4j_memory", exist_ok=True)
pid = self._neo4j_pid.read_text()
memory_usage = subprocess.run(args=["jcmd", pid, "VM.native_memory"], capture_output=True, text=True)
file = Path(Path().cwd() / "neo4j_memory" / workload)
if file.exists():
with file.open("r+") as f:
f.write(memory_usage.stdout)
f.close()
else:
file.touch()
with file.open("r+") as f:
f.write(memory_usage.stdout)
f.close()
class Client:
def __init__(
self, client_binary: str, temporary_directory: str, bolt_port: int, username: str = "", password: str = ""
@ -159,7 +398,7 @@ class Client:
password=self._password,
port=self._bolt_port,
)
ret = subprocess.run(args, stdout=subprocess.PIPE, check=True)
ret = subprocess.run(args, capture_output=True, check=True)
data = ret.stdout.decode("utf-8").strip().split("\n")
# data = [x for x in data if not x.startswith("[")]
return list(map(json.loads, data))