diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml
index 605783495..170f91edf 100644
--- a/.github/workflows/diff.yaml
+++ b/.github/workflows/diff.yaml
@@ -437,7 +437,7 @@ jobs:
- name: Run mgbench
run: |
cd tests/mgbench
- ./benchmark.py --num-workers-for-benchmark 12 --export-results benchmark_result.json pokec/medium/*/*
+ ./benchmark.py vendor-native --num-workers-for-benchmark 12 --export-results benchmark_result.json pokec/medium/*/*
- name: Upload mgbench results
run: |
diff --git a/.github/workflows/release_mgbench_client.yaml b/.github/workflows/release_mgbench_client.yaml
new file mode 100644
index 000000000..2abdad2b9
--- /dev/null
+++ b/.github/workflows/release_mgbench_client.yaml
@@ -0,0 +1,63 @@
+name: "Mgbench Bolt Client Publish Docker Image"
+
+on:
+ workflow_dispatch:
+ inputs:
+ version:
+ description: "Mgbench bolt client version to publish on Dockerhub."
+ required: true
+ force_release:
+ type: boolean
+ required: false
+ default: false
+
+jobs:
+ mgbench_docker_publish:
+ runs-on: ubuntu-latest
+ env:
+ DOCKER_ORGANIZATION_NAME: memgraph
+ DOCKER_REPOSITORY_NAME: mgbench-client
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v2
+
+ - name: Set up Docker Buildx
+ id: buildx
+ uses: docker/setup-buildx-action@v2
+
+ - name: Log in to Docker Hub
+ uses: docker/login-action@v2
+ with:
+ username: ${{ secrets.DOCKERHUB_USERNAME }}
+ password: ${{ secrets.DOCKERHUB_TOKEN }}
+
+ - name: Check if specified version is already pushed
+ run: |
+ EXISTS=$(docker manifest inspect $DOCKER_ORGANIZATION_NAME/$DOCKER_REPOSITORY_NAME:${{ github.event.inputs.version }} > /dev/null; echo $?)
+ echo $EXISTS
+ if [[ ${EXISTS} -eq 0 ]]; then
+ echo 'The specified version has been already released to DockerHub.'
+ if [[ ${{ github.event.inputs.force_release }} = true ]]; then
+ echo 'Forcing the release!'
+ else
+ echo 'Stopping the release!'
+ exit 1
+ fi
+ else
+ echo 'All good the specified version has not been release to DockerHub.'
+ fi
+
+ - name: Build & push docker images
+ run: |
+ cd tests/mgbench
+ docker buildx build \
+ --build-arg TOOLCHAIN_VERSION=toolchain-v4 \
+ --platform linux/amd64,linux/arm64 \
+ --tag $DOCKER_ORGANIZATION_NAME/$DOCKER_REPOSITORY_NAME:${{ github.event.inputs.version }} \
+ --tag $DOCKER_ORGANIZATION_NAME/$DOCKER_REPOSITORY_NAME:latest \
+ --file Dockerfile.mgbench_client \
+ --push .
diff --git a/libs/setup.sh b/libs/setup.sh
index 14ad2b4b9..fda10c24a 100755
--- a/libs/setup.sh
+++ b/libs/setup.sh
@@ -117,7 +117,7 @@ declare -A primary_urls=(
["mgconsole"]="http://$local_cache_host/git/mgconsole.git"
["spdlog"]="http://$local_cache_host/git/spdlog"
["nlohmann"]="http://$local_cache_host/file/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp"
- ["neo4j"]="http://$local_cache_host/file/neo4j-community-3.2.3-unix.tar.gz"
+ ["neo4j"]="http://$local_cache_host/file/neo4j-community-5.6.0-unix.tar.gz"
["librdkafka"]="http://$local_cache_host/git/librdkafka.git"
["protobuf"]="http://$local_cache_host/git/protobuf.git"
["pulsar"]="http://$local_cache_host/git/pulsar.git"
@@ -142,7 +142,7 @@ declare -A secondary_urls=(
["mgconsole"]="http://github.com/memgraph/mgconsole.git"
["spdlog"]="https://github.com/gabime/spdlog"
["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp"
- ["neo4j"]="https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/neo4j-community-3.2.3-unix.tar.gz"
+ ["neo4j"]="https://dist.neo4j.org/neo4j-community-5.6.0-unix.tar.gz"
["librdkafka"]="https://github.com/edenhill/librdkafka.git"
["protobuf"]="https://github.com/protocolbuffers/protobuf.git"
["pulsar"]="https://github.com/apache/pulsar.git"
@@ -180,9 +180,9 @@ repo_clone_try_double "${primary_urls[libbcrypt]}" "${secondary_urls[libbcrypt]}
# neo4j
file_get_try_double "${primary_urls[neo4j]}" "${secondary_urls[neo4j]}"
-tar -xzf neo4j-community-3.2.3-unix.tar.gz
-mv neo4j-community-3.2.3 neo4j
-rm neo4j-community-3.2.3-unix.tar.gz
+tar -xzf neo4j-community-5.6.0-unix.tar.gz
+mv neo4j-community-5.6.0 neo4j
+rm neo4j-community-5.6.0-unix.tar.gz
# nlohmann json
# We wget header instead of cloning repo since repo is huge (lots of test data).
diff --git a/licenses/third-party/ldbc/LICENSE b/licenses/third-party/ldbc/LICENSE
new file mode 100644
index 000000000..d64569567
--- /dev/null
+++ b/licenses/third-party/ldbc/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/tests/mgbench/.gitignore b/tests/mgbench/.gitignore
index 16d3c4dbb..9aac9a807 100644
--- a/tests/mgbench/.gitignore
+++ b/tests/mgbench/.gitignore
@@ -1 +1,6 @@
.cache
+*.zip
+*.log
+*.report
+*.sysinfo
+*.json
diff --git a/tests/mgbench/Dockerfile.mgbench_client b/tests/mgbench/Dockerfile.mgbench_client
new file mode 100644
index 000000000..ee04ab99c
--- /dev/null
+++ b/tests/mgbench/Dockerfile.mgbench_client
@@ -0,0 +1,42 @@
+FROM ubuntu:22.04 AS mg_bench_client_build_base
+
+SHELL ["/bin/bash", "-c"]
+ARG TOOLCHAIN_VERSION
+ARG TARGETARCH
+
+ENV DEBIAN_FRONTEND=noninteractive
+
+USER root
+
+RUN apt update && apt install -y \
+ ca-certificates wget git
+
+RUN wget -q https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/${TOOLCHAIN_VERSION}/${TOOLCHAIN_VERSION}-binaries-ubuntu-22.04-${TARGETARCH}.tar.gz \
+ -O ${TOOLCHAIN_VERSION}-binaries-ubuntu-22.04-${TARGETARCH}.tar.gz \
+ && tar xzvf ${TOOLCHAIN_VERSION}-binaries-ubuntu-22.04-${TARGETARCH}.tar.gz -C /opt
+
+RUN git clone https://github.com/memgraph/memgraph.git
+
+WORKDIR memgraph
+
+RUN if [ ${TARGETARCH} = "amd64" ] ; then ./environment/os/ubuntu-22.04.sh install TOOLCHAIN_RUN_DEPS ; else ./environment/os/ubuntu-22.04-arm.sh install TOOLCHAIN_RUN_DEPS ; fi
+RUN if [ ${TARGETARCH} = "amd64" ] ; then ./environment/os/ubuntu-22.04.sh install MEMGRAPH_BUILD_DEPS ; else ./environment/os/ubuntu-22.04-arm.sh install MEMGRAPH_BUILD_DEPS ; fi
+
+RUN source /opt/toolchain-v4/activate && \
+ ./init && \
+ rm -r build && \
+ mkdir build && \
+ cd build && \
+ cmake -DCMAKE_BUILD_TYPE=release .. && \
+ make -j$(nproc) memgraph__mgbench__client && \
+ make .
+
+
+FROM ubuntu:22.04
+
+RUN apt-get update && apt-get install -y wget libcurl4
+
+# Copy mgbench client to clean image
+COPY --from=mg_bench_client_build_base /memgraph/build/tests/mgbench/client /bin/
+
+ENTRYPOINT ["bin/client"]
diff --git a/tests/mgbench/README.md b/tests/mgbench/README.md
index 6c51e4850..2feb7817f 100644
--- a/tests/mgbench/README.md
+++ b/tests/mgbench/README.md
@@ -2,15 +2,15 @@
## :clipboard: Benchmark Overview
-mgBench is primarily designed to benchmark graph databases. To test graph database performance, this benchmark executes Cypher queries (write, read, update, aggregate, and analyze) on a given dataset. Queries are general and represent a typical workload that would be used to analyze any graph dataset. [BenchGraph](https://memgraph.com/benchgraph/) platform shows the results of running these queries on supported vendors. It shows the overall performance of each system relative to others.
+mgBench is primarily designed to benchmark graph databases (Currently, Neo4j and Memgraph). To test graph database performance, this benchmark executes Cypher queries that can write, read, update, aggregate, and analyze dataset present in database. There are some predefined queries and dataset in mgbench. The present datasets and queries represent a typical workload that would be used to analyze any graph dataset and are pure Cypher based. [BenchGraph](https://memgraph.com/benchgraph/) platform shows the results of running these queries on specified hardware and under certain conditions. It shows the overall performance of each system under test relative to other, best in test being the baseline.
-Three workload types can be executed:
+There is also a [tutorial on how to use mgbench](how_to_use_mgbench.md) to define your own workload and run workloads on supported vendors.
+
+Mgbench supports three workload types can be executed:
- Isolated - Concurrent execution of a single type of query,
- Mixed - Concurrent execution of a single type of query mixed with a certain percentage of queries from a designated query group,
- Realistic - Concurrent execution of queries from write, read, update and analyze groups.
-Currently, the benchmark is executed on the social media dataset Pokec, available in different sizes. The full list of queries and their grouping is available as [query list](#query-list).
-
This methodology is designed to be read from top to bottom to understand what is being tested and how, but feel free to jump to parts that interest you.
- [:fire: mgBench: Benchmark for graph databases](#fire-mgbench-benchmark-for-graph-databases)
@@ -44,7 +44,7 @@ This methodology is designed to be read from top to bottom to understand what is
### Reproducibility and validation
-Running this benchmark is automated, and the code used to run benchmarks is publicly available. You can [run mgBench](#running-the-benchmark) with default settings to validate the results at [BenchGraph platform](https://memgraph.com/benchgraph). The results may differ depending on the hardware, database configuration, and other variables involved in your setup. But if the results you get are significantly different, feel free to [open a GitHub issue](https://github.com/memgraph/memgraph/issues).
+Running this benchmark is automated, and the code used to run benchmarks is publicly available. You can [run mgBench](#running-the-benchmark) with default settings to validate the results at [BenchGraph platform](https://memgraph.com/benchgraph). The results may differ depending on the hardware, benchmark run configuration, database configuration, and other variables involved in your setup. But if the results you get are significantly different, feel free to [open a GitHub issue](https://github.com/memgraph/memgraph/issues).
In the future, the project will be expanded to include more platforms to see how systems perform on different OS and hardware configurations. If you are interested in what will be added and tested, read the section about [the future of mgBench](#future-of-mgbench)
@@ -53,7 +53,7 @@ In the future, the project will be expanded to include more platforms to see how
At the moment, support for graph databases is limited. To run the benchmarks, the graph database must support Cypher query language and the Bolt protocol.
-Using Cypher ensures that executed queries are identical or similar on every supported system. Possible differences are noted in [database notes](#database-notes). A single C++ client queries all database systems, and it is based on the Bolt protocol. Using a single client ensures minimal performance penalties from the client side and ensures fairness across different vendors.
+Using Cypher ensures that executed queries are identical or similar as possible on every supported system. A single C++ client queries database systems (Currently, Neo4j and Memgraph), and it is based on the Bolt protocol. Using a single client ensures minimal performance penalties from the client side and ensures fairness across different vendors.
If your database supports the given requirements, feel free to contribute and add your database to mgBench.
If your database does not support the mentioned requirements, follow the project because support for other languages and protocols in graph database space will be added.
@@ -89,11 +89,10 @@ Some configurational changes are necessary for test execution and are not consid
Benchmarking different systems is challenging because the setup, environment, queries, workload, and dataset can benefit specific database vendors. Each vendor may have a particularly strong use-case scenario. This benchmark aims to be neutral and fair to all database vendors. Acknowledging some of the current limitations can help understand the issues you might notice:
1. mgBench measures and tracks just a tiny subset of everything that can be tracked and compared during testing. Active benchmarking is strenuous because it requires a lot of time to set up and validate. Passive benchmarking is much faster to iterate on but can have a few bugs.
-2. Datasets and queries used for testing are simple. Datasets and queries in real-world environments can become quite complex. To avoid Cypher specifics, mgBench uses simple queries of different variates. Future versions will include more complex datasets and queries.
-3. The scale of the dataset used is miniature for production environments. Production environments can have up to trillions of nodes and edges.
+2. The scale of the dataset used is miniature for production environments. Production environments can have up to trillions of nodes and edges.
Query results are not verified or important. The queries might return different results, but only the performance is measured, not correctness.
-4. All tests are performed on single-node databases.
-5. Architecturally different systems can be set up and measured biasedly.
+3. All tests are performed on single-node databases.
+4. Architecturally different systems can be set up and measured biasedly.
## :wrench: mgBench
@@ -101,14 +100,14 @@ Query results are not verified or important. The queries might return different
Listed below are the main scripts used to run the benchmarks:
-- `benchmark.py` - Script that runs the queries and workloads.
-- `datasets.py` - Script that handles datasets and queries for workloads.
+- `benchmark.py` - Script that runs the single workload iteration of queries.
+- `workloads.py` - Base script that defines how dataset and queries are defined.
- `runners.py` - Script holding the configuration for different DB vendors.
- `client.cpp` - Client for querying the database.
-- `graph_bench.py` - Script that starts all predefined and custom-defined workloads.
--` compare_results.py` - Script that visually compares benchmark results.
+- `graph_bench.py` - Script that starts all predefined and custom-defined workloads thst.
+- `compare_results.py` - Script that visually compares benchmark results.
-Except for these scripts, the project also includes dataset files and index configuration files. Once the first test is executed, those files can be located in the newly generated .cache folder.
+Except for these scripts, the project also includes dataset files and index configuration files. Once the first test is executed, those files can be located in the newly generated `.cache` folder.
### Prerequisites
diff --git a/tests/mgbench/benchgraph.sh b/tests/mgbench/benchgraph.sh
new file mode 100755
index 000000000..15b1d73bf
--- /dev/null
+++ b/tests/mgbench/benchgraph.sh
@@ -0,0 +1,163 @@
+#!/bin/bash -e
+
+pushd () { command pushd "$@" > /dev/null; }
+popd () { command popd "$@" > /dev/null; }
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+pushd "$SCRIPT_DIR"
+
+print_help () {
+ echo -e "$0\t\t => runs all available benchmarks with the prompt"
+ echo -e "$0 run_all\t => runs all available benchmarks"
+ echo -e "$0 zip\t => packages all result files and info about the system"
+ echo -e "$0 clean\t => removes all result files including the zip"
+ echo -e "$0 -h\t => prints help"
+ echo ""
+ echo " env vars:"
+ echo " MGBENCH_MEMGRAPH_BIN_PATH -> path to the memgraph binary in the release mode"
+ echo " MGBENCH_NEO_BIN_PATH -> path to the neo4j binary"
+ exit 0
+}
+
+MG_PATH="${MGBENCH_MEMGRAPH_BIN_PATH:-$SCRIPT_DIR/../../build/memgraph}"
+NEO_PATH="${MGBENCH_NEO_BIN_PATH:-$SCRIPT_DIR/../../libs/neo4j/bin/neo4j}"
+# If you want to skip some of the workloads or workers, just comment lines
+# under the WORKLOADS or WORKERS variables.
+WORKLOADS=(
+ pokec_small
+ pokec_medium
+ ldbc_interactive_sf0_1
+ ldbc_interactive_sf1
+ ldbc_bi_sf1
+ ldbc_interactive_sf3
+ ldbc_bi_sf3
+)
+WORKERS=(
+ 24
+ 48
+)
+
+check_binary () {
+ binary_path=$1
+ if [ -f "$binary_path" ]; then
+ echo "$binary_path found."
+ else
+ echo "Failed to find $binary_path exiting..."
+ exit 1
+ fi
+}
+check_all_binaries () {
+ check_binary "$MG_PATH"
+ check_binary "$NEO_PATH"
+}
+
+pokec_small () {
+ workers=$1
+ echo "running ${FUNCNAME[0]} with $workers client workers"
+ python3 graph_bench.py --vendor memgraph "$MG_PATH" --vendor neo4j "$NEO_PATH" \
+ --dataset-name pokec --dataset-group basic --dataset-size small \
+ --realistic 500 30 70 0 0 \
+ --realistic 500 30 70 0 0 \
+ --realistic 500 50 50 0 0 \
+ --realistic 500 70 30 0 0 \
+ --realistic 500 30 40 10 20 \
+ --mixed 500 30 0 0 0 70 \
+ --num-workers-for-benchmark "$workers"
+}
+
+pokec_medium () {
+ workers=$1
+ echo "running ${FUNCNAME[0]} with $workers client workers"
+ python3 graph_bench.py --vendor memgraph "$MG_PATH" --vendor neo4j "$NEO_PATH" \
+ --dataset-name pokec --dataset-group basic --dataset-size medium \
+ --realistic 500 30 70 0 0 \
+ --realistic 500 30 70 0 0 \
+ --realistic 500 50 50 0 0 \
+ --realistic 500 70 30 0 0 \
+ --realistic 500 30 40 10 20 \
+ --mixed 500 30 0 0 0 70 \
+ --num-workers-for-benchmark "$workers"
+}
+
+ldbc_interactive_sf0_1 () {
+ workers=$1
+ echo "running ${FUNCNAME[0]} with $workers client workers"
+ python3 graph_bench.py --vendor memgraph "$MG_PATH" --vendor neo4j "$NEO_PATH" \
+ --dataset-name ldbc_interactive --dataset-group interactive --dataset-size sf0.1 \
+ --num-workers-for-benchmark "$workers"
+}
+
+ldbc_interactive_sf1 () {
+ workers=$1
+ echo "running ${FUNCNAME[0]} with $workers client workers"
+ python3 graph_bench.py --vendor memgraph "$MG_PATH" --vendor neo4j "$NEO_PATH" \
+ --dataset-name ldbc_interactive --dataset-group interactive --dataset-size sf1 \
+ --num-workers-for-benchmark "$workers"
+}
+
+ldbc_bi_sf1 () {
+ workers=$1
+ echo "running ${FUNCNAME[0]} with $workers client workers"
+ python3 graph_bench.py --vendor memgraph "$MG_PATH" --vendor neo4j "$NEO_PATH" \
+ --dataset-name ldbc_bi --dataset-group bi --dataset-size sf1 \
+ --num-workers-for-benchmark "$workers"
+}
+
+ldbc_interactive_sf3 () {
+ workers=$1
+ echo "running ${FUNCNAME[0]} with $workers client workers"
+ python3 graph_bench.py --vendor memgraph "$MG_PATH" --vendor neo4j "$NEO_PATH" \
+ --dataset-name ldbc_interactive --dataset-group interactive --dataset-size sf3 \
+ --num-workers-for-benchmark "$workers"
+}
+
+ldbc_bi_sf3 () {
+ workers=$1
+ echo "running ${FUNCNAME[0]} with $workers client workers"
+ python3 graph_bench.py --vendor memgraph "$MG_PATH" --vendor neo4j "$NEO_PATH" \
+ --dataset-name ldbc_bi --dataset-group bi --dataset-size sf3 \
+ --num-workers-for-benchmark "$workers"
+}
+
+run_all () {
+ for workload in "${WORKLOADS[@]}"; do
+ for workers in "${WORKERS[@]}"; do
+ $workload "$workers"
+ sleep 1
+ done
+ done
+}
+
+package_all_results () {
+ cat /proc/cpuinfo > cpu.sysinfo
+ cat /proc/meminfo > mem.sysinfo
+ zip data.zip ./*.json ./*.report ./*.log ./*.sysinfo
+}
+
+clean_all_results () {
+ rm data.zip ./*.json ./*.report ./*.log ./*.sysinfo
+}
+
+if [ "$#" -eq 0 ]; then
+ check_all_binaries
+ read -p "Run all benchmarks? y|Y for YES, anything else NO " -n 1 -r
+ if [[ $REPLY =~ ^[Yy]$ ]]; then
+ run_all
+ fi
+elif [ "$#" -eq 1 ]; then
+ case $1 in
+ run_all)
+ run_all
+ ;;
+ zip)
+ package_all_results
+ ;;
+ clean)
+ clean_all_results
+ ;;
+ *)
+ print_help
+ ;;
+ esac
+else
+ print_help
+fi
diff --git a/tests/mgbench/benchmark.py b/tests/mgbench/benchmark.py
index 376396650..69aa1a1ad 100755
--- a/tests/mgbench/benchmark.py
+++ b/tests/mgbench/benchmark.py
@@ -14,27 +14,28 @@
import argparse
import json
import multiprocessing
+import pathlib
import platform
import random
+import sys
import helpers
import log
import runners
+import setup
from benchmark_context import BenchmarkContext
from workloads import *
WITH_FINE_GRAINED_AUTHORIZATION = "with_fine_grained_authorization"
WITHOUT_FINE_GRAINED_AUTHORIZATION = "without_fine_grained_authorization"
-QUERY_COUNT_LOWER_BOUND = 30
def parse_args():
+ parser = argparse.ArgumentParser(description="Main parser.", add_help=False)
- parser = argparse.ArgumentParser(
- description="Memgraph benchmark executor.",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- )
- parser.add_argument(
+ benchmark_parser = argparse.ArgumentParser(description="Benchmark arguments parser", add_help=False)
+
+ benchmark_parser.add_argument(
"benchmarks",
nargs="*",
default=None,
@@ -48,80 +49,65 @@ def parse_args():
"the default group is '*' which selects all groups; the"
"default query is '*' which selects all queries",
)
- parser.add_argument(
- "--vendor-binary",
- help="Vendor binary used for benchmarking, by default it is memgraph",
- default=helpers.get_binary_path("memgraph"),
- )
- parser.add_argument(
- "--vendor-name",
- default="memgraph",
- choices=["memgraph", "neo4j"],
- help="Input vendor binary name (memgraph, neo4j)",
- )
- parser.add_argument(
- "--client-binary",
- default=helpers.get_binary_path("tests/mgbench/client"),
- help="Client binary used for benchmarking",
- )
- parser.add_argument(
+ benchmark_parser.add_argument(
"--num-workers-for-import",
type=int,
default=multiprocessing.cpu_count() // 2,
help="number of workers used to import the dataset",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--num-workers-for-benchmark",
type=int,
default=1,
help="number of workers used to execute the benchmark",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--single-threaded-runtime-sec",
type=int,
default=10,
help="single threaded duration of each query",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
+ "--query-count-lower-bound",
+ type=int,
+ default=30,
+ help="Lower bound for query count, minimum number of queries that will be executed. If approximated --single-threaded-runtime-sec query count is lower than this value, lower bound is used.",
+ )
+ benchmark_parser.add_argument(
"--no-load-query-counts",
action="store_true",
default=False,
help="disable loading of cached query counts",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--no-save-query-counts",
action="store_true",
default=False,
help="disable storing of cached query counts",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--export-results",
default=None,
help="file path into which results should be exported",
)
- parser.add_argument(
- "--temporary-directory",
- default="/tmp",
- help="directory path where temporary data should be stored",
- )
- parser.add_argument(
+ benchmark_parser.add_argument(
"--no-authorization",
action="store_false",
default=True,
help="Run each query with authorization",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--warm-up",
default="cold",
choices=["cold", "hot", "vulcanic"],
help="Run different warmups before benchmarks sample starts",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--workload-realistic",
nargs="*",
type=int,
@@ -134,7 +120,7 @@ def parse_args():
70% read, 10% update and 0% analytical.""",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--workload-mixed",
nargs="*",
type=int,
@@ -147,29 +133,66 @@ def parse_args():
with the presence of 300 write queries from write type or 30%""",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--time-depended-execution",
type=int,
default=0,
help="Execute defined number of queries (based on single-threaded-runtime-sec) for a defined duration in of wall-clock time",
)
- parser.add_argument(
+ benchmark_parser.add_argument(
"--performance-tracking",
action="store_true",
default=False,
help="Flag for runners performance tracking, this logs RES through time and vendor specific performance tracking.",
)
- parser.add_argument("--customer-workloads", default=None, help="Path to customers workloads")
+ benchmark_parser.add_argument("--customer-workloads", default=None, help="Path to customers workloads")
- parser.add_argument(
+ benchmark_parser.add_argument(
"--vendor-specific",
nargs="*",
default=[],
help="Vendor specific arguments that can be applied to each vendor, format: [key=value, key=value ...]",
)
+ subparsers = parser.add_subparsers(help="Subparsers", dest="run_option")
+
+ # Vendor native parser starts here
+ parser_vendor_native = subparsers.add_parser(
+ "vendor-native",
+ help="Running database in binary native form",
+ parents=[benchmark_parser],
+ )
+ parser_vendor_native.add_argument(
+ "--vendor-name",
+ default="memgraph",
+ choices=["memgraph", "neo4j"],
+ help="Input vendor binary name (memgraph, neo4j)",
+ )
+ parser_vendor_native.add_argument(
+ "--vendor-binary",
+ help="Vendor binary used for benchmarking, by default it is memgraph",
+ default=helpers.get_binary_path("memgraph"),
+ )
+
+ parser_vendor_native.add_argument(
+ "--client-binary",
+ default=helpers.get_binary_path("tests/mgbench/client"),
+ help="Client binary used for benchmarking",
+ )
+
+ # Vendor docker parsers starts here
+ parser_vendor_docker = subparsers.add_parser(
+ "vendor-docker", help="Running database in docker", parents=[benchmark_parser]
+ )
+ parser_vendor_docker.add_argument(
+ "--vendor-name",
+ default="memgraph",
+ choices=["memgraph-docker", "neo4j-docker"],
+ help="Input vendor name to run in docker (memgraph-docker, neo4j-docker)",
+ )
+
return parser.parse_args()
@@ -184,28 +207,28 @@ def get_queries(gen, count):
def warmup(condition: str, client: runners.BaseRunner, queries: list = None):
- log.log("Database condition {} ".format(condition))
+ log.init("Started warm-up procedure to match database condition: {} ".format(condition))
if condition == "hot":
- log.log("Execute warm-up to match condition {} ".format(condition))
+ log.log("Execute warm-up to match condition: {} ".format(condition))
client.execute(
queries=[
("CREATE ();", {}),
("CREATE ()-[:TempEdge]->();", {}),
- ("MATCH (n) RETURN n LIMIT 1;", {}),
+ ("MATCH (n) RETURN count(n.prop) LIMIT 1;", {}),
],
num_workers=1,
)
elif condition == "vulcanic":
- log.log("Execute warm-up to match condition {} ".format(condition))
+ log.log("Execute warm-up to match condition: {} ".format(condition))
client.execute(queries=queries)
else:
- log.log("No warm-up on condition {} ".format(condition))
+ log.log("No warm-up on condition: {} ".format(condition))
+ log.log("Finished warm-up procedure to match database condition: {} ".format(condition))
def mixed_workload(
vendor: runners.BaseRunner, client: runners.BaseClient, dataset, group, queries, benchmark_context: BenchmarkContext
):
-
num_of_queries = benchmark_context.mode_config[0]
percentage_distribution = benchmark_context.mode_config[1:]
if sum(percentage_distribution) != 100:
@@ -233,7 +256,7 @@ def mixed_workload(
"analytical": [],
}
- for (_, funcname) in queries[group]:
+ for _, funcname in queries[group]:
for key in queries_by_type.keys():
if key in funcname:
queries_by_type[key].append(funcname)
@@ -252,8 +275,7 @@ def mixed_workload(
full_workload = []
log.info(
- "Running query in mixed workload:",
- "{}/{}/{}".format(
+ "Running query in mixed workload: {}/{}/{}".format(
group,
query,
funcname,
@@ -278,7 +300,7 @@ def mixed_workload(
additional_query = getattr(dataset, funcname)
full_workload.append(additional_query())
- vendor.start_benchmark(
+ vendor.start_db(
dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution
)
warmup(benchmark_context.warm_up, client=client)
@@ -286,7 +308,7 @@ def mixed_workload(
queries=full_workload,
num_workers=benchmark_context.num_workers_for_benchmark,
)[0]
- usage_workload = vendor.stop(
+ usage_workload = vendor.stop_db(
dataset.NAME + dataset.get_variant() + "_" + "mixed" + "_" + query + "_" + config_distribution
)
@@ -313,13 +335,13 @@ def mixed_workload(
additional_query = getattr(dataset, funcname)
full_workload.append(additional_query())
- vendor.start_benchmark(dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution)
+ vendor.start_db(dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution)
warmup(benchmark_context.warm_up, client=client)
ret = client.execute(
queries=full_workload,
num_workers=benchmark_context.num_workers_for_benchmark,
)[0]
- usage_workload = vendor.stop(
+ usage_workload = vendor.stop_db(
dataset.NAME + dataset.get_variant() + "_" + "realistic" + "_" + config_distribution
)
mixed_workload = {
@@ -349,7 +371,6 @@ def get_query_cache_count(
config_key: list,
benchmark_context: BenchmarkContext,
):
-
cached_count = config.get_value(*config_key)
if cached_count is None:
log.info(
@@ -357,8 +378,9 @@ def get_query_cache_count(
benchmark_context.single_threaded_runtime_sec
)
)
+ log.log("Running query to prime the query cache...")
# First run to prime the query caches.
- vendor.start_benchmark("cache")
+ vendor.start_db("cache")
client.execute(queries=queries, num_workers=1)
# Get a sense of the runtime.
count = 1
@@ -379,11 +401,10 @@ def get_query_cache_count(
break
else:
count = count * 10
- vendor.stop("cache")
+ vendor.stop_db("cache")
- QUERY_COUNT_LOWER_BOUND = 30
- if count < QUERY_COUNT_LOWER_BOUND:
- count = QUERY_COUNT_LOWER_BOUND
+ if count < benchmark_context.query_count_lower_bound:
+ count = benchmark_context.query_count_lower_bound
config.set_value(
*config_key,
@@ -394,7 +415,7 @@ def get_query_cache_count(
)
else:
log.log(
- "Using cached query count of {} queries for {} seconds of single-threaded runtime.".format(
+ "Using cached query count of {} queries for {} seconds of single-threaded runtime to extrapolate .".format(
cached_count["count"], cached_count["duration"]
),
)
@@ -403,14 +424,10 @@ def get_query_cache_count(
if __name__ == "__main__":
-
args = parse_args()
vendor_specific_args = helpers.parse_kwargs(args.vendor_specific)
assert args.benchmarks != None, helpers.list_available_workloads()
- assert args.vendor_name == "memgraph" or args.vendor_name == "neo4j", "Unsupported vendors"
- assert args.vendor_binary != None, "Pass database binary for runner"
- assert args.client_binary != None, "Pass client binary for benchmark client "
assert args.num_workers_for_import > 0
assert args.num_workers_for_benchmark > 0
assert args.export_results != None, "Pass where will results be saved"
@@ -421,17 +438,21 @@ if __name__ == "__main__":
args.workload_realistic == None or args.workload_mixed == None
), "Cannot run both realistic and mixed workload, only one mode run at the time"
+ temp_dir = pathlib.Path.cwd() / ".temp"
+ temp_dir.mkdir(parents=True, exist_ok=True)
+
benchmark_context = BenchmarkContext(
benchmark_target_workload=args.benchmarks,
- vendor_binary=args.vendor_binary,
- vendor_name=args.vendor_name,
- client_binary=args.client_binary,
+ vendor_binary=args.vendor_binary if args.run_option == "vendor-native" else None,
+ vendor_name=args.vendor_name.replace("-", ""),
+ client_binary=args.client_binary if args.run_option == "vendor-native" else None,
num_workers_for_import=args.num_workers_for_import,
num_workers_for_benchmark=args.num_workers_for_benchmark,
single_threaded_runtime_sec=args.single_threaded_runtime_sec,
+ query_count_lower_bound=args.query_count_lower_bound,
no_load_query_counts=args.no_load_query_counts,
export_results=args.export_results,
- temporary_directory=args.temporary_directory,
+ temporary_directory=temp_dir.absolute(),
workload_mixed=args.workload_mixed,
workload_realistic=args.workload_realistic,
time_dependent_execution=args.time_depended_execution,
@@ -444,10 +465,16 @@ if __name__ == "__main__":
log.init("Executing benchmark with following arguments: ")
for key, value in benchmark_context.__dict__.items():
- log.log(str(key) + " : " + str(value))
+ log.log("{:<30} : {:<30}".format(str(key), str(value)))
+
+ log.init("Check requirements for running benchmark")
+ if setup.check_requirements(benchmark_context=benchmark_context):
+ log.success("Requirements satisfied... ")
+ else:
+ log.warning("Requirements not satisfied...")
+ sys.exit(1)
log.log("Creating cache folder for: dataset, configurations, indexes, results etc. ")
- # Create cache, config and results objects.
cache = helpers.Cache()
log.init("Folder in use: " + cache.get_default_cache_directory())
if not benchmark_context.no_load_query_counts:
@@ -457,15 +484,11 @@ if __name__ == "__main__":
config = helpers.RecursiveDict()
results = helpers.RecursiveDict()
- log.init("Creating vendor runner for DB: " + benchmark_context.vendor_name)
- vendor_runner = runners.BaseRunner.create(
- benchmark_context=benchmark_context,
- )
- log.log("Class in use: " + str(vendor_runner))
-
run_config = {
"vendor": benchmark_context.vendor_name,
"condition": benchmark_context.warm_up,
+ "num_workers_for_benchmark": benchmark_context.num_workers_for_benchmark,
+ "single_threaded_runtime_sec": benchmark_context.single_threaded_runtime_sec,
"benchmark_mode": benchmark_context.mode,
"benchmark_mode_config": benchmark_context.mode_config,
"platform": platform.platform(),
@@ -475,53 +498,78 @@ if __name__ == "__main__":
available_workloads = helpers.get_available_workloads(benchmark_context.customer_workloads)
- log.init("Currently available workloads: ")
- log.log(helpers.list_available_workloads(benchmark_context.customer_workloads))
-
# Filter out the workloads based on the pattern
target_workloads = helpers.filter_workloads(
available_workloads=available_workloads, benchmark_context=benchmark_context
)
+ if len(target_workloads) == 0:
+ log.error("No workloads matched the pattern: " + str(benchmark_context.benchmark_target_workload))
+ log.error("Please check the pattern and workload NAME property, query group and query name.")
+ log.info("Currently available workloads: ")
+ log.log(helpers.list_available_workloads(benchmark_context.customer_workloads))
+ sys.exit(1)
+
# Run all target workloads.
for workload, queries in target_workloads:
log.info("Started running following workload: " + str(workload.NAME))
+ benchmark_context.set_active_workload(workload.NAME)
+ benchmark_context.set_active_variant(workload.get_variant())
+
+ log.init("Creating vendor runner for DB: " + benchmark_context.vendor_name)
+ vendor_runner = runners.BaseRunner.create(
+ benchmark_context=benchmark_context,
+ )
+ log.log("Class in use: " + str(vendor_runner.__class__.__name__))
+
log.info("Cleaning the database from any previous data")
vendor_runner.clean_db()
client = vendor_runner.fetch_client()
- log.log("Get appropriate client for vendor " + str(client))
+ log.log("Get appropriate client for vendor " + str(client.__class__.__name__))
ret = None
usage = None
- log.init("Preparing workload: " + workload.NAME + "/" + workload.get_variant())
- workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant()))
generated_queries = workload.dataset_generator()
if generated_queries:
- vendor_runner.start_preparation("import")
+ print("\n")
log.info("Using workload as dataset generator...")
- if workload.get_index():
- log.info("Using index from specified file: {}".format(workload.get_index()))
- client.execute(file_path=workload.get_index(), num_workers=benchmark_context.num_workers_for_import)
- else:
- log.warning("Make sure proper indexes/constraints are created in generated queries!")
+
+ vendor_runner.start_db_init("import")
+
+ log.warning("Using following indexes...")
+ log.info(workload.indexes_generator())
+ log.info("Executing database index setup...")
+ ret = client.execute(queries=workload.indexes_generator(), num_workers=1)
+ log.log("Finished setting up indexes...")
+ for row in ret:
+ log.success(
+ "Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format(
+ row["count"], row["duration"], row["num_workers"], row["throughput"]
+ )
+ )
+
+ log.info("Importing dataset...")
ret = client.execute(queries=generated_queries, num_workers=benchmark_context.num_workers_for_import)
- usage = vendor_runner.stop("import")
+ log.log("Finished importing dataset...")
+ usage = vendor_runner.stop_db_init("import")
else:
+ log.init("Preparing workload: " + workload.NAME + "/" + workload.get_variant())
+ workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant()))
log.info("Using workload dataset information for import...")
imported = workload.custom_import()
if not imported:
log.log("Basic import execution")
- vendor_runner.start_preparation("import")
+ vendor_runner.start_db_init("import")
log.log("Executing database index setup...")
- client.execute(file_path=workload.get_index(), num_workers=benchmark_context.num_workers_for_import)
+ client.execute(file_path=workload.get_index(), num_workers=1)
log.log("Importing dataset...")
ret = client.execute(
file_path=workload.get_file(), num_workers=benchmark_context.num_workers_for_import
)
- usage = vendor_runner.stop("import")
+ usage = vendor_runner.stop_db_init("import")
else:
log.info("Custom import executed...")
@@ -531,7 +579,7 @@ if __name__ == "__main__":
# Display import statistics.
for row in ret:
log.success(
- "Executed {} queries in {} seconds using {} workers with a total throughput of {} + Q/S.".format(
+ "Executed {} queries in {} seconds using {} workers with a total throughput of {} Q/S.".format(
row["count"], row["duration"], row["num_workers"], row["throughput"]
)
)
@@ -539,7 +587,7 @@ if __name__ == "__main__":
log.success(
"The database used {} seconds of CPU time and peaked at {} MiB of RAM".format(
usage["cpu"], usage["memory"] / 1024 / 1024
- ),
+ )
)
results.set_value(*import_key, value={"client": ret, "database": usage})
@@ -548,6 +596,7 @@ if __name__ == "__main__":
# Run all benchmarks in all available groups.
for group in sorted(queries.keys()):
+ print("\n")
log.init("Running benchmark in " + benchmark_context.mode)
if benchmark_context.mode == "Mixed":
mixed_workload(vendor_runner, client, workload, group, queries, benchmark_context)
@@ -568,12 +617,13 @@ if __name__ == "__main__":
group,
query,
]
+ log.init("Determining query count for benchmark based on --single-threaded-runtime argument")
count = get_query_cache_count(
vendor_runner, client, get_queries(func, 1), config_key, benchmark_context
)
-
# Benchmark run.
- log.info("Sample query:{}".format(get_queries(func, 1)[0][0]))
+ sample_query = get_queries(func, 1)[0][0]
+ log.info("Sample query:{}".format(sample_query))
log.log(
"Executing benchmark with {} queries that should yield a single-threaded runtime of {} seconds.".format(
count, benchmark_context.single_threaded_runtime_sec
@@ -584,11 +634,11 @@ if __name__ == "__main__":
benchmark_context.num_workers_for_benchmark
)
)
- vendor_runner.start_benchmark(
+ vendor_runner.start_db(
workload.NAME + workload.get_variant() + "_" + "_" + benchmark_context.mode + "_" + query
)
-
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
+ log.init("Executing benchmark queries...")
if benchmark_context.time_dependent_execution != 0:
ret = client.execute(
queries=get_queries(func, count),
@@ -600,8 +650,8 @@ if __name__ == "__main__":
queries=get_queries(func, count),
num_workers=benchmark_context.num_workers_for_benchmark,
)[0]
-
- usage = vendor_runner.stop(
+ log.info("Benchmark execution finished...")
+ usage = vendor_runner.stop_db(
workload.NAME + workload.get_variant() + "_" + benchmark_context.mode + "_" + query
)
ret["database"] = usage
@@ -610,15 +660,28 @@ if __name__ == "__main__":
log.log("Executed {} queries in {} seconds.".format(ret["count"], ret["duration"]))
log.log("Queries have been retried {} times".format(ret["retries"]))
log.log("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
- log.log("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
- log.log("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max"))
- metadata = ret["metadata"]
- for key in sorted(metadata.keys()):
- log.log(
- "{name:>30}: {minimum:>20.06f} {average:>20.06f} "
- "{maximum:>20.06f}".format(name=key, **metadata[key])
- )
+ log.info("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
+ if "docker" not in benchmark_context.vendor_name:
+ log.log("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max"))
+ metadata = ret["metadata"]
+ for key in sorted(metadata.keys()):
+ log.log(
+ "{name:>30}: {minimum:>20.06f} {average:>20.06f} "
+ "{maximum:>20.06f}".format(name=key, **metadata[key])
+ )
+ print("\n")
+ log.info("Result:")
+ log.info(funcname)
+ log.info(sample_query)
+ log.success("Latency statistics:")
+ for key, value in ret["latency_stats"].items():
+ if key == "iterations":
+ log.success("{:<10} {:>10}".format(key, value))
+ else:
+ log.success("{:<10} {:>10.06f} seconds".format(key, value))
+
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
+ print("\n\n")
# Save results.
results_key = [
@@ -632,8 +695,9 @@ if __name__ == "__main__":
# If there is need for authorization testing.
if benchmark_context.no_authorization:
- log.info("Running queries with authorization...")
- vendor_runner.start_benchmark("authorization")
+ log.init("Running queries with authorization...")
+ log.info("Setting USER and PRIVILEGES...")
+ vendor_runner.start_db("authorization")
client.execute(
queries=[
("CREATE USER user IDENTIFIED BY 'test';", {}),
@@ -644,13 +708,11 @@ if __name__ == "__main__":
)
client.set_credentials(username="user", password="test")
- vendor_runner.stop("authorization")
+ vendor_runner.stop_db("authorization")
for query, funcname in queries[group]:
-
- log.info(
- "Running query:",
- "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION),
+ log.init(
+ "Running query:" + "{}/{}/{}/{}".format(group, query, funcname, WITH_FINE_GRAINED_AUTHORIZATION)
)
func = getattr(workload, funcname)
@@ -664,13 +726,14 @@ if __name__ == "__main__":
vendor_runner, client, get_queries(func, 1), config_key, benchmark_context
)
- vendor_runner.start_benchmark("authorization")
+ vendor_runner.start_db("authorization")
warmup(condition=benchmark_context.warm_up, client=client, queries=get_queries(func, count))
+
ret = client.execute(
queries=get_queries(func, count),
num_workers=benchmark_context.num_workers_for_benchmark,
)[0]
- usage = vendor_runner.stop("authorization")
+ usage = vendor_runner.stop_db("authorization")
ret["database"] = usage
# Output summary.
log.log("Executed {} queries in {} seconds.".format(ret["count"], ret["duration"]))
@@ -695,8 +758,8 @@ if __name__ == "__main__":
]
results.set_value(*results_key, value=ret)
- # Clean up database from any roles and users job
- vendor_runner.start_benchmark("authorizations")
+ log.info("Deleting USER and PRIVILEGES...")
+ vendor_runner.start_db("authorizations")
ret = client.execute(
queries=[
("REVOKE LABELS * FROM user;", {}),
@@ -704,7 +767,7 @@ if __name__ == "__main__":
("DROP USER user;", {}),
]
)
- vendor_runner.stop("authorization")
+ vendor_runner.stop_db("authorization")
# Save configuration.
if not benchmark_context.no_save_query_counts:
@@ -714,3 +777,30 @@ if __name__ == "__main__":
if benchmark_context.export_results:
with open(benchmark_context.export_results, "w") as f:
json.dump(results.get_data(), f)
+
+ # Results summary.
+ log.init("~" * 45)
+ log.info("Benchmark finished.")
+ log.init("~" * 45)
+ log.log("\n")
+ log.summary("Benchmark summary")
+ log.log("-" * 90)
+ log.summary("{:<20} {:>30} {:>30}".format("Query name", "Throughput", "Peak Memory usage"))
+ with open(benchmark_context.export_results, "r") as f:
+ results = json.load(f)
+ for dataset, variants in results.items():
+ if dataset == "__run_configuration__":
+ continue
+ for variant, groups in variants.items():
+ for group, queries in groups.items():
+ if group == "__import__":
+ continue
+ for query, auth in queries.items():
+ for key, value in auth.items():
+ log.log("-" * 90)
+ log.summary(
+ "{:<20} {:>26.2f} QPS {:>27.2f} MB".format(
+ query, value["throughput"], value["database"]["memory"] / 1024.0 / 1024.0
+ )
+ )
+ log.log("-" * 90)
diff --git a/tests/mgbench/benchmark_context.py b/tests/mgbench/benchmark_context.py
index a01f253ce..71f507fdc 100644
--- a/tests/mgbench/benchmark_context.py
+++ b/tests/mgbench/benchmark_context.py
@@ -1,3 +1,15 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+
# Describes all the information of single benchmark.py run.
class BenchmarkContext:
"""
@@ -7,12 +19,13 @@ class BenchmarkContext:
def __init__(
self,
benchmark_target_workload: str = None, # Workload that needs to be executed (dataset/variant/group/query)
- vendor_binary: str = None, # Benchmark vendor binary
+ vendor_binary: str = None,
vendor_name: str = None,
client_binary: str = None,
num_workers_for_import: int = None,
num_workers_for_benchmark: int = None,
single_threaded_runtime_sec: int = 0,
+ query_count_lower_bound: int = 0,
no_load_query_counts: bool = False,
no_save_query_counts: bool = False,
export_results: str = None,
@@ -26,7 +39,6 @@ class BenchmarkContext:
customer_workloads: str = None,
vendor_args: dict = {},
) -> None:
-
self.benchmark_target_workload = benchmark_target_workload
self.vendor_binary = vendor_binary
self.vendor_name = vendor_name
@@ -34,6 +46,7 @@ class BenchmarkContext:
self.num_workers_for_import = num_workers_for_import
self.num_workers_for_benchmark = num_workers_for_benchmark
self.single_threaded_runtime_sec = single_threaded_runtime_sec
+ self.query_count_lower_bound = query_count_lower_bound
self.no_load_query_counts = no_load_query_counts
self.no_save_query_counts = no_save_query_counts
self.export_results = export_results
@@ -55,3 +68,17 @@ class BenchmarkContext:
self.no_authorization = no_authorization
self.customer_workloads = customer_workloads
self.vendor_args = vendor_args
+ self.active_workload = None
+ self.active_variant = None
+
+ def set_active_workload(self, workload: str) -> None:
+ self.active_workload = workload
+
+ def get_active_workload(self) -> str:
+ return self.active_workload
+
+ def set_active_variant(self, variant: str) -> None:
+ self.active_variant = variant
+
+ def get_active_variant(self) -> str:
+ return self.active_variant
diff --git a/tests/mgbench/compare_results.py b/tests/mgbench/compare_results.py
index 65c77fc3f..0762fd605 100755
--- a/tests/mgbench/compare_results.py
+++ b/tests/mgbench/compare_results.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-# Copyright 2022 Memgraph Ltd.
+# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -97,8 +97,79 @@ def compare_results(results_from, results_to, fields, ignored, different_vendors
return ret
-def generate_remarkup(fields, data):
- ret = "==== Benchmark summary: ====\n\n"
+def generate_remarkup(fields, data, results_from=None, results_to=None):
+ ret = "\n"
+ ret += """
+
+ """
+ ret += "
Benchmark comparison
\n"
+ if results_from and results_to:
+ ret += """
+ Benchmark configuration
+
+
+ Configuration |
+ Reference vendor |
+ Vendor |
+
+
+ Vendor name |
+ {} |
+ {} |
+
+
+ Vendor condition |
+ {} |
+ {} |
+
+
+ Number of workers |
+ {} |
+ {} |
+
+
+ Single threaded runtime |
+ {} |
+ {} |
+
+
+ Platform |
+ {} |
+ {} |
+
+
+ """.format(
+ results_from["vendor"],
+ results_to["vendor"],
+ results_from["condition"],
+ results_to["condition"],
+ results_from["num_workers_for_benchmark"],
+ results_to["num_workers_for_benchmark"],
+ results_from["single_threaded_runtime_sec"],
+ results_to["single_threaded_runtime_sec"],
+ results_from["platform"],
+ results_to["platform"],
+ )
+ ret += """
+ How to read benchmark results
+ Throughput and latency values:
+ If vendor {} is faster than the reference vendor {} , the result for throughput and latency are show in green , otherwise red . Percentage difference is visible relative to reference vendor {}.
+ Memory usage:
+ If the vendor {} uses less memory then the reference vendor {} , the result is shown in green , otherwise red . Percentage difference for memory is visible relative to reference vendor {}.
+ """.format(
+ results_to["vendor"],
+ results_from["vendor"],
+ results_from["vendor"],
+ results_to["vendor"],
+ results_from["vendor"],
+ results_from["vendor"],
+ )
+
+ ret += "
Benchmark results
\n"
if len(data) > 0:
ret += "\n"
ret += " \n"
@@ -135,6 +206,7 @@ def generate_remarkup(fields, data):
ret += '{:.3f}{} //(new)// | \n'.format(value, field["unit"])
ret += "
\n"
ret += "
\n"
+ ret += "\n"
else:
ret += "No performance change detected.\n"
return ret
@@ -276,7 +348,11 @@ if __name__ == "__main__":
results_to = load_results(file_to)
data.update(compare_results(results_from, results_to, fields, ignored, args.different_vendors))
- remarkup = generate_remarkup(fields, data)
+ results_from_config = (
+ results_from["__run_configuration__"] if "__run_configuration__" in results_from.keys() else None
+ )
+ results_to_config = results_to["__run_configuration__"] if "__run_configuration__" in results_to.keys() else None
+ remarkup = generate_remarkup(fields, data, results_from=results_from_config, results_to=results_to_config)
if args.output:
with open(args.output, "w") as f:
f.write(remarkup)
diff --git a/tests/mgbench/cypher/ldbc_to_cypher.py b/tests/mgbench/cypher/ldbc_to_cypher.py
index 0bd06b471..fe9303798 100644
--- a/tests/mgbench/cypher/ldbc_to_cypher.py
+++ b/tests/mgbench/cypher/ldbc_to_cypher.py
@@ -1,3 +1,17 @@
+#!/usr/bin/env python3
+
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+# --- DISCLAIMER: This is NOT an official implementation of an LDBC Benchmark. ---
import argparse
import csv
import sys
@@ -24,7 +38,6 @@ BI_LINK = {
if __name__ == "__main__":
-
parser = argparse.ArgumentParser(
prog="LDBC CSV to CYPHERL converter",
description="""Converts all LDBC CSV files to CYPHERL transactions, for faster Memgraph load""",
@@ -42,7 +55,6 @@ if __name__ == "__main__":
output_directory.mkdir(exist_ok=True)
if args.type == "interactive":
-
NODES_INTERACTIVE = [
{"filename": "Place", "label": "Place"},
{"filename": "Organisation", "label": "Organisation"},
@@ -260,7 +272,6 @@ if __name__ == "__main__":
raise Exception("Didn't find the file that was needed!")
elif args.type == "bi":
-
NODES_BI = [
{"filename": "Place", "label": "Place"},
{"filename": "Organisation", "label": "Organisation"},
diff --git a/tests/mgbench/graph_bench.py b/tests/mgbench/graph_bench.py
index e0a4fac85..bcba55324 100644
--- a/tests/mgbench/graph_bench.py
+++ b/tests/mgbench/graph_bench.py
@@ -1,3 +1,16 @@
+#!/usr/bin/env python3
+
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
import argparse
import json
import subprocess
@@ -54,30 +67,61 @@ def parse_arguments():
help="Forward config for query",
)
+ parser.add_argument(
+ "--num-workers-for-benchmark",
+ type=int,
+ default=12,
+ help="number of workers used to execute the benchmark",
+ )
+
+ parser.add_argument(
+ "--query-count-lower-bound",
+ type=int,
+ default=300,
+ help="number of workers used to execute the benchmark (works only for isolated run)",
+ )
+
+ parser.add_argument(
+ "--single-threaded-runtime-sec",
+ type=int,
+ default=30,
+ help="Duration of single threaded benchmark per query (works only for isolated run)",
+ )
+
args = parser.parse_args()
return args
-def run_full_benchmarks(vendor, binary, dataset, dataset_size, dataset_group, realistic, mixed):
-
+def run_full_benchmarks(
+ vendor,
+ binary,
+ dataset,
+ dataset_size,
+ dataset_group,
+ realistic,
+ mixed,
+ workers,
+ query_count_lower_bound,
+ single_threaded_runtime_sec,
+):
configurations = [
# Basic isolated test cold
[
"--export-results",
- vendor + "_" + dataset + "_" + dataset_size + "_cold_isolated.json",
+ vendor + "_" + str(workers) + "_" + dataset + "_" + dataset_size + "_cold_isolated.json",
],
# Basic isolated test hot
[
"--export-results",
- vendor + "_" + dataset + "_" + dataset_size + "_hot_isolated.json",
+ vendor + "_" + str(workers) + "_" + dataset + "_" + dataset_size + "_hot_isolated.json",
"--warm-up",
"hot",
],
# Basic isolated test vulcanic
[
"--export-results",
- vendor + "_" + dataset + "_" + dataset_size + "_vulcanic_isolated.json",
+ vendor + "_" + str(workers) + "_" + dataset + "_" + dataset_size + "_vulcanic_isolated.json",
"--warm-up",
"vulcanic",
],
@@ -90,6 +134,8 @@ def run_full_benchmarks(vendor, binary, dataset, dataset_size, dataset_group, re
"--export-results",
vendor
+ "_"
+ + str(workers)
+ + "_"
+ dataset
+ "_"
+ dataset_size
@@ -106,6 +152,8 @@ def run_full_benchmarks(vendor, binary, dataset, dataset_size, dataset_group, re
"--export-results",
vendor
+ "_"
+ + str(workers)
+ + "_"
+ dataset
+ "_"
+ dataset_size
@@ -130,6 +178,8 @@ def run_full_benchmarks(vendor, binary, dataset, dataset_size, dataset_group, re
"--export-results",
vendor
+ "_"
+ + str(workers)
+ + "_"
+ dataset
+ "_"
+ dataset_size
@@ -146,6 +196,8 @@ def run_full_benchmarks(vendor, binary, dataset, dataset_size, dataset_group, re
"--export-results",
vendor
+ "_"
+ + str(workers)
+ + "_"
+ dataset
+ "_"
+ dataset_size
@@ -167,12 +219,17 @@ def run_full_benchmarks(vendor, binary, dataset, dataset_size, dataset_group, re
default_args = [
"python3",
"benchmark.py",
+ "vendor-native",
"--vendor-binary",
binary,
"--vendor-name",
vendor,
"--num-workers-for-benchmark",
- "12",
+ str(workers),
+ "--single-threaded-runtime-sec",
+ str(single_threaded_runtime_sec),
+ "--query-count-lower-bound",
+ str(query_count_lower_bound),
"--no-authorization",
dataset + "/" + dataset_size + "/" + dataset_group + "/*",
]
@@ -183,10 +240,12 @@ def run_full_benchmarks(vendor, binary, dataset, dataset_size, dataset_group, re
subprocess.run(args=full_config, check=True)
-def collect_all_results(vendor_name, dataset, dataset_size, dataset_group):
+def collect_all_results(vendor_name, dataset, dataset_size, dataset_group, workers):
working_directory = Path().absolute()
print(working_directory)
- results = sorted(working_directory.glob(vendor_name + "_" + dataset + "_" + dataset_size + "_*.json"))
+ results = sorted(
+ working_directory.glob(vendor_name + "_" + str(workers) + "_" + dataset + "_" + dataset_size + "_*.json")
+ )
summary = {dataset: {dataset_size: {dataset_group: {}}}}
for file in results:
@@ -210,7 +269,7 @@ def collect_all_results(vendor_name, dataset, dataset_size, dataset_group):
json_object = json.dumps(summary, indent=4)
print(json_object)
- with open(vendor_name + "_" + dataset + "_" + dataset_size + "_summary.json", "w") as f:
+ with open(vendor_name + "_" + str(workers) + "_" + dataset + "_" + dataset_size + "_summary.json", "w") as f:
json.dump(summary, f)
@@ -232,8 +291,13 @@ if __name__ == "__main__":
args.dataset_group,
realistic,
mixed,
+ args.num_workers_for_benchmark,
+ args.query_count_lower_bound,
+ args.single_threaded_runtime_sec,
+ )
+ collect_all_results(
+ vendor_name, args.dataset_name, args.dataset_size, args.dataset_group, args.num_workers_for_benchmark
)
- collect_all_results(vendor_name, args.dataset_name, args.dataset_size, args.dataset_group)
else:
raise Exception(
"Check that vendor: {} is supported and you are passing right path: {} to binary.".format(
diff --git a/tests/mgbench/how_to_use_benchgraph.md b/tests/mgbench/how_to_use_benchgraph.md
new file mode 100644
index 000000000..77864ac1c
--- /dev/null
+++ b/tests/mgbench/how_to_use_benchgraph.md
@@ -0,0 +1,219 @@
+# How to use mgBench
+
+Running your workloads that include custom queries and the dataset is the best way to evaluate system performance on your use case. Each workload has unique requirements that are imposed from the use case. Since your use-case queries and dataset will be used in production, it is best to use those.
+We worked on cleaning MgBench architecture so it is easier for users to add their custom workloads and queries to evaluate performance on supported systems.
+
+This tutorial contains the following content:
+
+- [How to add your custom workload](#how-to-add-your-custom-workload)
+- [How to run benchmarks on your custom workload](#how-to-run-benchmarks-on-your-custom-workload)
+- [How to configure benchmark run](#how-to-configure-benchmark-run)
+- [How to compare results](#how-to-compare-results)
+- [Customizing workload generator](#customizing-workload-generator)
+
+
+## How to add your custom workload
+
+If you want to run your custom workload on supported systems (Currently, Memgraph and Neo4j), you can start by writing a simple Python class. The idea is to specify a simple class that contains your dataset generation queries, index generation queries and queries used for running a benchmark.
+
+Here are 5 steps you need to do to specify your **workload**:
+
+1. [Inherit the workload class](#1-inherit-the-workload-class)
+2. [Define a workload name](#2-define-the-workload-name)
+3. [Implement dataset generator method](#3-implement-dataset-generator-method)
+4. [Implement index generator method](#4-implement-the-index-generator)
+5. [Define the queries you want to benchmark](#4-define-the-queries-you-want-to-benchmark)
+
+Here is the simplified version of [demo.py](https://github.com/memgraph/memgraph/blob/master/tests/mgbench/workloads/demo.py) example:
+
+```python
+import random
+from workloads.base import Workload
+
+class Demo(Workload):
+
+ NAME = "demo"
+
+ def indexes_generator(self):
+ indexes = [
+ ("CREATE INDEX ON :NodeA(id);", {}),
+ ("CREATE INDEX ON :NodeB(id);", {}),
+ ]
+ return indexes
+
+ def dataset_generator(self):
+
+ queries = []
+ for i in range(0, 100):
+ queries.append(("CREATE (:NodeA {id: $id});", {"id": i}))
+ queries.append(("CREATE (:NodeB {id: $id});", {"id": i}))
+ for i in range(0, 300):
+ a = random.randint(0, 99)
+ b = random.randint(0, 99)
+ queries.append(
+ (("MATCH(a:NodeA {id: $A_id}),(b:NodeB{id: $B_id}) CREATE (a)-[:EDGE]->(b)"), {"A_id": a, "B_id": b})
+ )
+
+ return queries
+
+ def benchmark__test__get_nodes(self):
+ return ("MATCH (n) RETURN n;", {})
+
+ def benchmark__test__get_node_by_id(self):
+ return ("MATCH (n:NodeA{id: $id}) RETURN n;", {"id": random.randint(0, 99)})
+
+
+```
+
+Let's break this script down into smaller important elements:
+
+### 1. Inherit the `workload` class
+The `Demo` script class has a parent class `Workload`. Each custom workload should inherit from the base `Workload` class.
+
+```python
+from workloads.base import Workload
+
+class Demo(Workload):
+```
+
+### 2. Define the workload name
+The class should specify the `NAME` property. This is used to describe what workload class you want to execute. When calling `benchmark.py`, this property will be used to differentiate different workloads.
+
+```python
+NAME = "demo"
+```
+
+### 3. Implement dataset generator method
+The class should implement the `dataset_generator()` method. The method generates a dataset that returns the ***list of tuples***. Each tuple contains a string of the Cypher query and dictionary that contains optional arguments, so the structure is following [(str, dict), (str, dict)...]. Let's take a look at how the example list could look like what it could method return:
+
+```python
+queries = [
+ ("CREATE (:NodeA {id: 23});", {}),
+ ("CREATE (:NodeB {id: $id, foo: $property});", {"id" : 123, "property": "foo" }),
+ ...
+]
+```
+As you can see, you can pass just a Cypher query as a pure string without any values in the dictionary.
+
+```python
+("CREATE (:NodeA {id: 23});", {}),
+```
+
+Or you can specify parameters inside a dictionary. The variables next to `$` sign in the query string will be replaced by the appropriate values behind the key from the dictionary. In this case `$id` is replaced by `123` and `$property` is replaced by `foo`. The dictionary key names and variable names need to match.
+
+```python
+("CREATE (:NodeB {id: $id, foo: $property});", {"id" : 123, "property": "foo" })
+```
+
+Back to our `demo.py` example, in the `dataset_generator()` method, here you specify queries for generating a dataset. In the first for loop the queries for creating 100 nodes with the label `NodeA` and 100 nodes with the label `NodeB` are prepared. Each node has `id` between 0 and 99. In the second for loop, queries for connecting nodes randomly are generated. There is a total of 300 edges, each connected to random `NodeA` and `NodeB`.
+
+```python
+def dataset_generator(self):
+
+ for i in range(0, 100):
+ queries.append(("CREATE (:NodeA {id: $id});", {"id" : i}))
+ queries.append(("CREATE (:NodeB {id: $id});", {"id" : i}))
+ for i in range(0, 300):
+ a = random.randint(0, 99)
+ b = random.randint(0, 99)
+ queries.append((("MATCH(a:NodeA {id: $A_id}),(b:NodeB{id: $B_id}) CREATE (a)-[:EDGE]->(b)"), {"A_id": a, "B_id" : b}))
+
+ return queries
+```
+
+### 4. Implement the index generator method
+
+The class should also implement the `indexes_generator()` method. This is implemented the same way as the `dataset_generator()` method, instead of queries for the dataset, `indexes_generator()` should return the list of indexes that will be used. The returning structure again is the list of tuples that contains query string and dictionary of parameters. Here is an example:
+
+```python
+def indexes_generator(self):
+ indexes = [
+ ("CREATE INDEX ON :NodeA(id);", {}),
+ ("CREATE INDEX ON :NodeB(id);", {}),
+ ]
+ return indexes
+```
+
+### 5. Define the queries you want to benchmark
+
+Now that your dataset will be imported from dataset generator queries, you can specify what queries you wish to benchmark on the given dataset. Here are two queries that `demo.py` workload defines. They are written as Python methods that return a single tuple with query and dictionary, as in the data generator method.
+
+```python
+def benchmark__test__get_nodes(self):
+ return ("MATCH (n) RETURN n;", {})
+
+def benchmark__test__get_node_by_id(self):
+ return ("MATCH (n:NodeA{id: $id}) RETURN n;", {"id": random.randint(0, 99)})
+
+```
+
+The necessary details here are that each of the methods you wish to use in the benchmark test needs to start with `benchmark__` in the name, otherwise, it will be ignored. The complete method name has the following structure `benchmark__group__name`. The group can be used to execute specific tests, but more on that later.
+
+From the workload setup, this is all you need to do. Next step is how to run your workload. If you wish to improve the workload generator, look at [customizing workload generator](#customizing-workload-generator).
+
+## How to run benchmarks on your custom workload
+
+When running benchmarks, duration, query arguments, number of workers, and database condition play an important role on the results of the benchmark. MgBench provides several options for the configuration of how the benchmark is executed. Let's start with the most straightforward run of the demo workload from the example above.
+
+The main script that manages benchmark execution is `benchmark.py`.
+
+To start the benchmark, you need to run the following command with your paths and options:
+
+```python3 benchmark.py vendor-docker --vendor-name (memgraph-docker||neo4j-docker) benchmarks demo/*/*/* --export-results result.json --no-authorization```
+
+To run this on memgraph, the command looks like this:
+
+```python3 benchmark.py vendor-docker --vendor-name memgraph-docker benchmarks demo/*/*/* --export-results results.json --no-authorization```
+
+## How to configure benchmark run
+
+Hopefully, you should get logs from `benchmark.py` process managing the benchmark and execution from the command above. The script takes a lot of arguments. Some used in the run above are self-explanatory. But let's break down the most important ones:
+
+- `NAME/VARIANT/GROUP/QUERY ` - The argument `demo/*/*/*` says to execute the workload named `demo`, and all of its variants, group's and queries. This flag is used for direct control of what workload you wish to execute. The `NAME` here is the name of the workload defined in the Workload class. `VARIANT` is an additional workload configuration, which will be explained a bit later. `GROUP` is defined in the query method name, and the `QUERY` is query name you wish to execute. If you want to execute a specific query from `demo.py`, it would look like this: `demo/*/test/get_nodes`. This will run `demo` workload on all `variants`, in `test` query group and query `get_nodes`.
+
+- `--single-threaded-runtime-sec` - The question at hand is how many of each specific queries you wish to execute as a sample for a database benchmark. Each query can take a different time to execute, so fixating a number could yield some queries finishing in 1 second and others running for a minute. This flag defines the duration in seconds that will be used to approximate how many queries you wish to execute. The default value is 10 seconds, this means the `benchmark.py` will generate predetermined numbers of queries to approximate single treaded runtime of 10 seconds. Increasing this will yield a longer running test.
+Each specific query will get a different count that specifies how many queries will be generated. This can be inspected after the test. For example, for 10 seconds of single-threaded runtime, the queries from demo workload `get_node_by_id` got 64230 different queries, while `get_nodes` got 5061 because of different time complexity of queries.
+
+- `--num-workers-for-benchmark` - The flag defines how many concurrent clients will open and query the database. With this flag, you can simulate different database users connecting to the database and executing queries. Each of the clients is independent and executes queries as fast as possible. They share a total pool of queries that were generated by the `--single-threaded-runtime-sec`. This means the total number of queries that need to be executed is shared between a specified number of workers.
+
+- `--warm-up` - The warm-up flag can take a three different arguments, `cold`, `hot` and `vulcanic`. Cold is the default. There is no warm-up being executed, `hot` will execute some predefined queries before the benchmark, while `vulcanic` will run the whole workload first before taking measurements. Here is the implementation of [warm-up](https://github.com/memgraph/memgraph/blob/master/tests/mgbench/benchmark.py#L186)
+
+
+## How to compare results
+
+
+Once the benchmark has been run, the results are saved in a file specified by `--export-results` argument. You can use the results files and compare them against other vendor results via the `compare_results.py` script:
+
+```python compare_results.py --compare path_to/run_1.json path_to/run_2.json --output run_1_vs_run_2.html --different-vendors```
+
+The output is an HTML file with a visual representation of the performance differences between two compared vendors. The first passed summary JSON file is the reference point. Feel free to open an HTML file in any browser at hand.
+
+## Customizing workload generator
+
+### How to run the same workload on the different vendors
+
+The base [Workload class](#1-inherit-the-workload-class) has benchmarking context information that contains all benchmark arguments used in this run. Some are mentioned above. The key argument here is the `--vendor-name`, which defines what database is being used in this benchmark.
+
+During the creation of your workload, you can access the parent class property by using `self.benchmark_context.vendor_name`. For example, if you want to specify special index creation for each vendor, the `indexes_generator()` could look like this:
+
+```python
+ def indexes_generator(self):
+ indexes = []
+ if "neo4j" in self.benchmark_context.vendor_name:
+ indexes.extend(
+ [
+ ("CREATE INDEX FOR (n:NodeA) ON (n.id);", {}),
+ ("CREATE INDEX FOR (n:NodeB) ON (n.id);", {}),
+ ]
+ )
+ else:
+ indexes.extend(
+ [
+ ("CREATE INDEX ON :NodeA(id);", {}),
+ ("CREATE INDEX ON :NodeB(id);", {}),
+ ]
+ )
+ return indexes
+```
+
+The same applies to the `dataset_generator()`. During the generation of the dataset, you can use special types of queries for different vendors.
diff --git a/tests/mgbench/log.py b/tests/mgbench/log.py
index 01a6771b3..64b648937 100644
--- a/tests/mgbench/log.py
+++ b/tests/mgbench/log.py
@@ -1,4 +1,4 @@
-# Copyright 2021 Memgraph Ltd.
+# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -33,7 +33,7 @@ def _log(color, *args):
def log(msg):
- print(msg)
+ print(str(msg))
logger.info(msg=msg)
@@ -60,3 +60,8 @@ def warning(*args):
def error(*args):
_log(COLOR_RED, *args)
logger.critical(*args)
+
+
+def summary(*args):
+ _log(COLOR_CYAN, *args)
+ logger.info(*args)
diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py
index 923854dec..acd868377 100644
--- a/tests/mgbench/runners.py
+++ b/tests/mgbench/runners.py
@@ -13,6 +13,7 @@ import atexit
import json
import os
import re
+import socket
import subprocess
import tempfile
import threading
@@ -20,12 +21,15 @@ import time
from abc import ABC, abstractmethod
from pathlib import Path
+import log
from benchmark_context import BenchmarkContext
+DOCKER_NETWORK_NAME = "mgbench_network"
-def _wait_for_server(port, delay=0.1):
- cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
- while subprocess.call(cmd) != 0:
+
+def _wait_for_server_socket(port, ip="127.0.0.1", delay=0.1):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ while s.connect_ex((ip, int(port))) != 0:
time.sleep(0.01)
time.sleep(delay)
@@ -65,6 +69,27 @@ def _get_current_usage(pid):
return rss / 1024
+def _setup_docker_benchmark_network(network_name):
+ command = ["docker", "network", "ls", "--format", "{{.Name}}"]
+ networks = subprocess.run(command, check=True, capture_output=True, text=True).stdout.split("\n")
+ if network_name in networks:
+ return
+ else:
+ command = ["docker", "network", "create", network_name]
+ subprocess.run(command, check=True, capture_output=True, text=True)
+
+
+def _get_docker_container_ip(container_name):
+ command = [
+ "docker",
+ "inspect",
+ "--format",
+ "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
+ container_name,
+ ]
+ return subprocess.run(command, check=True, capture_output=True, text=True).stdout.strip()
+
+
class BaseClient(ABC):
@abstractmethod
def __init__(self, benchmark_context: BenchmarkContext):
@@ -97,26 +122,56 @@ class BoltClient(BaseClient):
queries=None,
file_path=None,
num_workers=1,
- max_retries: int = 50,
+ max_retries: int = 10000,
validation: bool = False,
time_dependent_execution: int = 0,
):
+ check_db_query = Path(self._directory.name) / "check_db_query.json"
+ with open(check_db_query, "w") as f:
+ query = ["RETURN 0;", {}]
+ json.dump(query, f)
+ f.write("\n")
+
+ check_db_args = self._get_args(
+ input=check_db_query,
+ num_workers=1,
+ max_retries=max_retries,
+ queries_json=True,
+ username=self._username,
+ password=self._password,
+ port=self._bolt_port,
+ validation=False,
+ time_dependent_execution=time_dependent_execution,
+ )
+
+ while True:
+ try:
+ subprocess.run(check_db_args, capture_output=True, text=True, check=True)
+ break
+ except subprocess.CalledProcessError as e:
+ log.log("Checking if database is up and running failed...")
+ log.warning("Reported errors from client:")
+ log.warning("Error: {}".format(e.stderr))
+ log.warning("Database is not up yet, waiting 3 seconds...")
+ time.sleep(3)
+
if (queries is None and file_path is None) or (queries is not None and file_path is not None):
raise ValueError("Either queries or input_path must be specified!")
- queries_json = False
+ queries_and_args_json = False
if queries is not None:
- queries_json = True
- file_path = os.path.join(self._directory.name, "queries.json")
+ queries_and_args_json = True
+ file_path = os.path.join(self._directory.name, "queries_and_args_json.json")
with open(file_path, "w") as f:
for query in queries:
json.dump(query, f)
f.write("\n")
+
args = self._get_args(
input=file_path,
num_workers=num_workers,
max_retries=max_retries,
- queries_json=queries_json,
+ queries_json=queries_and_args_json,
username=self._username,
password=self._password,
port=self._bolt_port,
@@ -131,12 +186,186 @@ class BoltClient(BaseClient):
error = ret.stderr.decode("utf-8").strip().split("\n")
data = ret.stdout.decode("utf-8").strip().split("\n")
if error and error[0] != "":
- print("Reported errros from client")
- print(error)
+ log.warning("Reported errors from client:")
+ log.warning("There is a possibility that query from: {} is not executed properly".format(file_path))
+ log.error(error)
+ log.error("Results for this query or benchmark run are probably invalid!")
data = [x for x in data if not x.startswith("[")]
return list(map(json.loads, data))
+class BoltClientDocker(BaseClient):
+ def __init__(self, benchmark_context: BenchmarkContext):
+ self._client_binary = benchmark_context.client_binary
+ self._directory = tempfile.TemporaryDirectory(dir=benchmark_context.temporary_directory)
+ self._username = ""
+ self._password = ""
+ self._bolt_port = (
+ benchmark_context.vendor_args["bolt-port"] if "bolt-port" in benchmark_context.vendor_args.keys() else 7687
+ )
+ self._container_name = "mgbench-bolt-client"
+ self._target_db_container = (
+ "memgraph_benchmark" if "memgraph" in benchmark_context.vendor_name else "neo4j_benchmark"
+ )
+
+ def _remove_container(self):
+ command = ["docker", "rm", "-f", self._container_name]
+ self._run_command(command)
+
+ def _create_container(self, *args):
+ command = [
+ "docker",
+ "create",
+ "--name",
+ self._container_name,
+ "--network",
+ DOCKER_NETWORK_NAME,
+ "memgraph/mgbench-client",
+ *args,
+ ]
+ self._run_command(command)
+
+ def _get_logs(self):
+ command = [
+ "docker",
+ "logs",
+ self._container_name,
+ ]
+ ret = self._run_command(command)
+ return ret
+
+ def _get_args(self, **kwargs):
+ return _convert_args_to_flags(**kwargs)
+
+ def execute(
+ self,
+ queries=None,
+ file_path=None,
+ num_workers=1,
+ max_retries: int = 50,
+ validation: bool = False,
+ time_dependent_execution: int = 0,
+ ):
+ if (queries is None and file_path is None) or (queries is not None and file_path is not None):
+ raise ValueError("Either queries or input_path must be specified!")
+
+ self._remove_container()
+ ip = _get_docker_container_ip(self._target_db_container)
+
+ # Perform a check to make sure the database is up and running
+ args = self._get_args(
+ address=ip,
+ input="/bin/check.json",
+ num_workers=1,
+ max_retries=max_retries,
+ queries_json=True,
+ username=self._username,
+ password=self._password,
+ port=self._bolt_port,
+ validation=False,
+ time_dependent_execution=0,
+ )
+
+ self._create_container(*args)
+
+ check_file = Path(self._directory.name) / "check.json"
+ with open(check_file, "w") as f:
+ query = ["RETURN 0;", {}]
+ json.dump(query, f)
+ f.write("\n")
+
+ command = [
+ "docker",
+ "cp",
+ check_file.resolve().as_posix(),
+ self._container_name + ":/bin/" + check_file.name,
+ ]
+ self._run_command(command)
+
+ command = [
+ "docker",
+ "start",
+ "-i",
+ self._container_name,
+ ]
+ while True:
+ try:
+ self._run_command(command)
+ break
+ except subprocess.CalledProcessError as e:
+ log.log("Checking if database is up and running failed!")
+ log.warning("Reported errors from client:")
+ log.warning("Error: {}".format(e.stderr))
+ log.warning("Database is not up yet, waiting 3 second")
+ time.sleep(3)
+
+ self._remove_container()
+
+ queries_and_args_json = False
+ if queries is not None:
+ queries_and_args_json = True
+ file_path = os.path.join(self._directory.name, "queries.json")
+ with open(file_path, "w") as f:
+ for query in queries:
+ json.dump(query, f)
+ f.write("\n")
+
+ self._remove_container()
+ ip = _get_docker_container_ip(self._target_db_container)
+
+ # Query file JSON or Cypher
+ file = Path(file_path)
+
+ args = self._get_args(
+ address=ip,
+ input="/bin/" + file.name,
+ num_workers=num_workers,
+ max_retries=max_retries,
+ queries_json=queries_and_args_json,
+ username=self._username,
+ password=self._password,
+ port=self._bolt_port,
+ validation=validation,
+ time_dependent_execution=time_dependent_execution,
+ )
+
+ self._create_container(*args)
+
+ command = [
+ "docker",
+ "cp",
+ file.resolve().as_posix(),
+ self._container_name + ":/bin/" + file.name,
+ ]
+ self._run_command(command)
+ log.log("Starting query execution...")
+ try:
+ command = [
+ "docker",
+ "start",
+ "-i",
+ self._container_name,
+ ]
+ self._run_command(command)
+ except subprocess.CalledProcessError as e:
+ log.warning("Reported errors from client:")
+ log.warning("Error: {}".format(e.stderr))
+
+ ret = self._get_logs()
+ error = ret.stderr.strip().split("\n")
+ if error and error[0] != "":
+ log.warning("There is a possibility that query from: {} is not executed properly".format(file_path))
+ log.warning(*error)
+ data = ret.stdout.strip().split("\n")
+ data = [x for x in data if not x.startswith("[")]
+ return list(map(json.loads, data))
+
+ def _run_command(self, command):
+ ret = subprocess.run(command, capture_output=True, check=True, text=True)
+ time.sleep(0.2)
+ return ret
+
+
class BaseRunner(ABC):
subclasses = {}
@@ -159,15 +388,19 @@ class BaseRunner(ABC):
self.benchmark_context = benchmark_context
@abstractmethod
- def start_benchmark(self):
+ def start_db_init(self):
pass
@abstractmethod
- def start_preparation(self):
+ def stop_db_init(self):
pass
@abstractmethod
- def stop(self):
+ def start_db(self):
+ pass
+
+ @abstractmethod
+ def stop_db(self):
pass
@abstractmethod
@@ -186,11 +419,6 @@ class Memgraph(BaseRunner):
self._performance_tracking = benchmark_context.performance_tracking
self._directory = tempfile.TemporaryDirectory(dir=benchmark_context.temporary_directory)
self._vendor_args = benchmark_context.vendor_args
- self._properties_on_edges = (
- self._vendor_args["no-properties-on-edges"]
- if "no-properties-on-edges" in self._vendor_args.keys()
- else False
- )
self._bolt_port = self._vendor_args["bolt-port"] if "bolt-port" in self._vendor_args.keys() else 7687
self._proc_mg = None
self._stop_event = threading.Event()
@@ -211,7 +439,9 @@ class Memgraph(BaseRunner):
data_directory = os.path.join(self._directory.name, "memgraph")
kwargs["bolt_port"] = self._bolt_port
kwargs["data_directory"] = data_directory
- kwargs["storage_properties_on_edges"] = self._properties_on_edges
+ kwargs["storage_properties_on_edges"] = True
+ for key, value in self._vendor_args.items():
+ kwargs[key] = value
return _convert_args_to_flags(self._memgraph_binary, **kwargs)
def _start(self, **kwargs):
@@ -223,7 +453,7 @@ class Memgraph(BaseRunner):
if self._proc_mg.poll() is not None:
self._proc_mg = None
raise Exception("The database process died prematurely!")
- _wait_for_server(self._bolt_port)
+ _wait_for_server_socket(self._bolt_port)
ret = self._proc_mg.poll()
assert ret is None, "The database process died prematurely " "({})!".format(ret)
@@ -236,21 +466,37 @@ class Memgraph(BaseRunner):
self._proc_mg = None
return ret, usage
- def start_preparation(self, workload):
+ def start_db_init(self, workload):
if self._performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
self._rss.clear()
p.start()
- self._start(storage_snapshot_on_exit=True)
+ self._start(storage_snapshot_on_exit=True, **self._vendor_args)
- def start_benchmark(self, workload):
+ def stop_db_init(self, workload):
+ if self._performance_tracking:
+ self._stop_event.set()
+ self.dump_rss(workload)
+ ret, usage = self._cleanup()
+ assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
+ return usage
+
+ def start_db(self, workload):
if self._performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
self._rss.clear()
p.start()
- self._start(storage_recover_on_startup=True)
+ self._start(storage_recover_on_startup=True, **self._vendor_args)
+
+ def stop_db(self, workload):
+ if self._performance_tracking:
+ self._stop_event.set()
+ self.dump_rss(workload)
+ ret, usage = self._cleanup()
+ assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
+ return usage
def clean_db(self):
if self._proc_mg is not None:
@@ -284,14 +530,6 @@ class Memgraph(BaseRunner):
f.write("\n")
f.close()
- def stop(self, workload):
- if self._performance_tracking:
- self._stop_event.set()
- self.dump_rss(workload)
- ret, usage = self._cleanup()
- assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
- return usage
-
def fetch_client(self) -> BoltClient:
return BoltClient(benchmark_context=self.benchmark_context)
@@ -304,6 +542,14 @@ class Neo4j(BaseRunner):
self._neo4j_config = self._neo4j_path / "conf" / "neo4j.conf"
self._neo4j_pid = self._neo4j_path / "run" / "neo4j.pid"
self._neo4j_admin = self._neo4j_path / "bin" / "neo4j-admin"
+ self._neo4j_dump = (
+ Path()
+ / ".cache"
+ / "datasets"
+ / self.benchmark_context.get_active_workload()
+ / self.benchmark_context.get_active_variant()
+ / "neo4j.dump"
+ )
self._performance_tracking = benchmark_context.performance_tracking
self._vendor_args = benchmark_context.vendor_args
self._stop_event = threading.Event()
@@ -372,13 +618,13 @@ class Neo4j(BaseRunner):
raise Exception("The database process is already running!")
args = _convert_args_to_flags(self._neo4j_binary, "start", **kwargs)
start_proc = subprocess.run(args, check=True)
- time.sleep(5)
+ time.sleep(0.5)
if self._neo4j_pid.exists():
print("Neo4j started!")
else:
raise Exception("The database process died prematurely!")
print("Run server check:")
- _wait_for_server(self._bolt_port)
+ _wait_for_server_socket(self._bolt_port)
def _cleanup(self):
if self._neo4j_pid.exists():
@@ -391,7 +637,7 @@ class Neo4j(BaseRunner):
else:
return 0
- def start_preparation(self, workload):
+ def start_db_init(self, workload):
if self._performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
@@ -404,18 +650,48 @@ class Neo4j(BaseRunner):
if self._performance_tracking:
self.get_memory_usage("start_" + workload)
- def start_benchmark(self, workload):
+ def stop_db_init(self, workload):
+ if self._performance_tracking:
+ self._stop_event.set()
+ self.get_memory_usage("stop_" + workload)
+ self.dump_rss(workload)
+ ret, usage = self._cleanup()
+ self.dump_db(path=self._neo4j_dump.parent)
+ assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
+ return usage
+
+ def start_db(self, workload):
if self._performance_tracking:
p = threading.Thread(target=self.res_background_tracking, args=(self._rss, self._stop_event))
self._stop_event.clear()
self._rss.clear()
p.start()
+
+ neo4j_dump = (
+ Path()
+ / ".cache"
+ / "datasets"
+ / self.benchmark_context.get_active_workload()
+ / self.benchmark_context.get_active_variant()
+ / "neo4j.dump"
+ )
+ if neo4j_dump.exists():
+ self.load_db_from_dump(path=neo4j_dump.parent)
# Start DB
self._start()
if self._performance_tracking:
self.get_memory_usage("start_" + workload)
+ def stop_db(self, workload):
+ if self._performance_tracking:
+ self._stop_event.set()
+ self.get_memory_usage("stop_" + workload)
+ self.dump_rss(workload)
+ ret, usage = self._cleanup()
+ assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
+ return usage
+
def dump_db(self, path):
print("Dumping the neo4j database...")
if self._neo4j_pid.exists():
@@ -426,7 +702,7 @@ class Neo4j(BaseRunner):
self._neo4j_admin,
"database",
"dump",
- "--overwrite-destination=false",
+ "--overwrite-destination=true",
"--to-path",
path,
"neo4j",
@@ -478,20 +754,10 @@ class Neo4j(BaseRunner):
def is_stopped(self):
pid_file = self._neo4j_path / "run" / "neo4j.pid"
if pid_file.exists():
-
return False
else:
return True
- def stop(self, workload):
- if self._performance_tracking:
- self._stop_event.set()
- self.get_memory_usage("stop_" + workload)
- self.dump_rss(workload)
- ret, usage = self._cleanup()
- assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
- return usage
-
def dump_rss(self, workload):
file_name = workload + "_rss"
Path.mkdir(Path().cwd() / "neo4j_memory", exist_ok=True)
@@ -521,3 +787,311 @@ class Neo4j(BaseRunner):
def fetch_client(self) -> BoltClient:
return BoltClient(benchmark_context=self.benchmark_context)
+
+
+class MemgraphDocker(BaseRunner):
+ def __init__(self, benchmark_context: BenchmarkContext):
+ super().__init__(benchmark_context=benchmark_context)
+ self._directory = tempfile.TemporaryDirectory(dir=benchmark_context.temporary_directory)
+ self._vendor_args = benchmark_context.vendor_args
+ self._bolt_port = self._vendor_args["bolt-port"] if "bolt-port" in self._vendor_args.keys() else "7687"
+ self._container_name = "memgraph_benchmark"
+ self._container_ip = None
+ self._config_file = None
+ _setup_docker_benchmark_network(network_name=DOCKER_NETWORK_NAME)
+
+ def _set_args(self, **kwargs):
+ return _convert_args_to_flags(**kwargs)
+
+ def start_db_init(self, message):
+ log.init("Starting database for import...")
+ try:
+ command = [
+ "docker",
+ "run",
+ "--detach",
+ "--network",
+ DOCKER_NETWORK_NAME,
+ "--name",
+ self._container_name,
+ "-it",
+ "-p",
+ self._bolt_port + ":" + self._bolt_port,
+ "memgraph/memgraph:2.7.0",
+ "--storage_wal_enabled=false",
+ "--storage_recover_on_startup=true",
+ "--storage_snapshot_interval_sec",
+ "0",
+ ]
+ command.extend(self._set_args(**self._vendor_args))
+ ret = self._run_command(command)
+ except subprocess.CalledProcessError as e:
+ log.error("Failed to start Memgraph docker container.")
+ log.error(
+ "There is probably a database running on that port, please stop the running container and try again."
+ )
+ raise e
+
+ command = [
+ "docker",
+ "cp",
+ self._container_name + ":/etc/memgraph/memgraph.conf",
+ self._directory.name + "/memgraph.conf",
+ ]
+ self._run_command(command)
+ self._config_file = Path(self._directory.name + "/memgraph.conf")
+ _wait_for_server_socket(self._bolt_port, delay=0.5)
+ log.log("Database started.")
+
+ def stop_db_init(self, message):
+ log.init("Stopping database...")
+ usage = self._get_cpu_memory_usage()
+
+ # Stop to save the snapshot
+ command = ["docker", "stop", self._container_name]
+ self._run_command(command)
+
+ # Change config back to default
+ argument = "--storage-snapshot-on-exit=false"
+ self._replace_config_args(argument)
+ command = [
+ "docker",
+ "cp",
+ self._config_file.resolve(),
+ self._container_name + ":/etc/memgraph/memgraph.conf",
+ ]
+ self._run_command(command)
+ log.log("Database stopped.")
+ return usage
+
+ def start_db(self, message):
+ log.init("Starting database for benchmark...")
+ command = ["docker", "start", self._container_name]
+ self._run_command(command)
+ ip_address = _get_docker_container_ip(self._container_name)
+ _wait_for_server_socket(self._bolt_port, delay=0.5)
+ log.log("Database started.")
+
+ def stop_db(self, message):
+ log.init("Stopping database...")
+ usage = self._get_cpu_memory_usage()
+ command = ["docker", "stop", self._container_name]
+ self._run_command(command)
+ log.log("Database stopped.")
+ return usage
+
+ def clean_db(self):
+ self.remove_container(self._container_name)
+
+ def fetch_client(self) -> BaseClient:
+ return BoltClientDocker(benchmark_context=self.benchmark_context)
+
+ def remove_container(self, containerName):
+ command = ["docker", "rm", "-f", containerName]
+ self._run_command(command)
+
+ def _replace_config_args(self, argument):
+ config_lines = []
+ with self._config_file.open("r") as file:
+ lines = file.readlines()
+ file.close()
+ key, value = argument.split("=")
+ for line in lines:
+ if line[0] == "#" or line.strip("\n") == "":
+ config_lines.append(line)
+ else:
+ key_file, value_file = line.split("=")
+ if key_file == key and value != value_file:
+ line = argument + "\n"
+ config_lines.append(line)
+
+ with self._config_file.open("w") as file:
+ file.writelines(config_lines)
+ file.close()
+
+ def _get_cpu_memory_usage(self):
+ command = [
+ "docker",
+ "exec",
+ "-it",
+ self._container_name,
+ "bash",
+ "-c",
+ "grep ^VmPeak /proc/1/status",
+ ]
+ usage = {"cpu": 0, "memory": 0}
+ ret = self._run_command(command)
+ memory = ret.stdout.split()
+ usage["memory"] = int(memory[1]) * 1024
+
+ command = [
+ "docker",
+ "exec",
+ "-it",
+ self._container_name,
+ "bash",
+ "-c",
+ "cat /proc/1/stat",
+ ]
+ stat = self._run_command(command).stdout.strip("\n")
+
+ command = [
+ "docker",
+ "exec",
+ "-it",
+ self._container_name,
+ "bash",
+ "-c",
+ "getconf CLK_TCK",
+ ]
+ CLK_TCK = int(self._run_command(command).stdout.strip("\n"))
+
+ cpu_time = sum(map(int, stat.split(")")[1].split()[11:15])) / CLK_TCK
+ usage["cpu"] = cpu_time
+
+ return usage
+
+ def _run_command(self, command):
+ ret = subprocess.run(command, check=True, capture_output=True, text=True)
+
+ time.sleep(0.2)
+ return ret
+
+
+class Neo4jDocker(BaseRunner):
+ def __init__(self, benchmark_context: BenchmarkContext):
+ super().__init__(benchmark_context=benchmark_context)
+ self._directory = tempfile.TemporaryDirectory(dir=benchmark_context.temporary_directory)
+ self._vendor_args = benchmark_context.vendor_args
+ self._bolt_port = self._vendor_args["bolt-port"] if "bolt-port" in self._vendor_args.keys() else "7687"
+ self._container_name = "neo4j_benchmark"
+ self._container_ip = None
+ self._config_file = None
+ _setup_docker_benchmark_network(DOCKER_NETWORK_NAME)
+
+ def _set_args(self, **kwargs):
+ return _convert_args_to_flags(**kwargs)
+
+ def start_db_init(self, message):
+ log.init("Starting database for initialization...")
+ try:
+ command = [
+ "docker",
+ "run",
+ "--detach",
+ "--network",
+ DOCKER_NETWORK_NAME,
+ "--name",
+ self._container_name,
+ "-it",
+ "-p",
+ self._bolt_port + ":" + self._bolt_port,
+ "--env",
+ "NEO4J_AUTH=none",
+ "neo4j:5.6.0",
+ ]
+ command.extend(self._set_args(**self._vendor_args))
+ ret = self._run_command(command)
+ except subprocess.CalledProcessError as e:
+ log.error("There was an error starting the Neo4j container!")
+ log.error(
+ "There is probably a database running on that port, please stop the running container and try again."
+ )
+ raise e
+ _wait_for_server_socket(self._bolt_port, delay=5)
+ log.log("Database started.")
+
+ def stop_db_init(self, message):
+ log.init("Stopping database...")
+ usage = self._get_cpu_memory_usage()
+
+ command = ["docker", "stop", self._container_name]
+ self._run_command(command)
+ log.log("Database stopped.")
+
+ return usage
+
+ def start_db(self, message):
+ log.init("Starting database...")
+ command = ["docker", "start", self._container_name]
+ self._run_command(command)
+ _wait_for_server_socket(self._bolt_port, delay=5)
+ log.log("Database started.")
+
+ def stop_db(self, message):
+ log.init("Stopping database...")
+ usage = self._get_cpu_memory_usage()
+
+ command = ["docker", "stop", self._container_name]
+ self._run_command(command)
+ log.log("Database stopped.")
+ return usage
+
+ def clean_db(self):
+ self.remove_container(self._container_name)
+
+ def fetch_client(self) -> BaseClient:
+ return BoltClientDocker(benchmark_context=self.benchmark_context)
+
+ def remove_container(self, containerName):
+ command = ["docker", "rm", "-f", containerName]
+ self._run_command(command)
+
+ def _get_cpu_memory_usage(self):
+ command = [
+ "docker",
+ "exec",
+ "-it",
+ self._container_name,
+ "bash",
+ "-c",
+ "cat /var/lib/neo4j/run/neo4j.pid",
+ ]
+ ret = self._run_command(command)
+ pid = ret.stdout.split()[0]
+
+ command = [
+ "docker",
+ "exec",
+ "-it",
+ self._container_name,
+ "bash",
+ "-c",
+ "grep ^VmPeak /proc/{}/status".format(pid),
+ ]
+ usage = {"cpu": 0, "memory": 0}
+ ret = self._run_command(command)
+ memory = ret.stdout.split()
+ usage["memory"] = int(memory[1]) * 1024
+
+ command = [
+ "docker",
+ "exec",
+ "-it",
+ self._container_name,
+ "bash",
+ "-c",
+ "cat /proc/{}/stat".format(pid),
+ ]
+ stat = self._run_command(command).stdout.strip("\n")
+
+ command = [
+ "docker",
+ "exec",
+ "-it",
+ self._container_name,
+ "bash",
+ "-c",
+ "getconf CLK_TCK",
+ ]
+ CLK_TCK = int(self._run_command(command).stdout.strip("\n"))
+
+ cpu_time = sum(map(int, stat.split(")")[1].split()[11:15])) / CLK_TCK
+ usage["cpu"] = cpu_time
+
+ return usage
+
+ def _run_command(self, command):
+ ret = subprocess.run(command, capture_output=True, check=True, text=True)
+ time.sleep(0.2)
+ return ret
diff --git a/tests/mgbench/setup.py b/tests/mgbench/setup.py
new file mode 100644
index 000000000..621078ce6
--- /dev/null
+++ b/tests/mgbench/setup.py
@@ -0,0 +1,34 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+import subprocess
+import sys
+from subprocess import CalledProcessError
+
+import log
+from benchmark_context import BenchmarkContext
+
+
+def check_requirements(benchmark_context: BenchmarkContext):
+ if "docker" in benchmark_context.vendor_name:
+ log.info("Checking requirements ... ")
+ command = ["docker", "info"]
+ try:
+ subprocess.run(command, check=True, capture_output=True, text=True)
+ except CalledProcessError:
+ log.error("Docker is not installed or not running")
+ return False
+
+ if sys.version_info.major < 3 or sys.version_info.minor < 6:
+ log.error("Python version 3.6 or higher is required")
+ return False
+
+ return True
diff --git a/tests/mgbench/validation.py b/tests/mgbench/validation.py
index d32d1d61b..680979ad4 100644
--- a/tests/mgbench/validation.py
+++ b/tests/mgbench/validation.py
@@ -1,3 +1,16 @@
+#!/usr/bin/env python3
+
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
import argparse
import copy
import multiprocessing
@@ -11,7 +24,6 @@ from workloads import base
def pars_args():
-
parser = argparse.ArgumentParser(
prog="Validator for individual query checking",
description="""Validates that query is running, and validates output between different vendors""",
@@ -90,7 +102,6 @@ def get_queries(gen, count):
if __name__ == "__main__":
-
args = pars_args()
benchmark_context_db_1 = BenchmarkContext(
@@ -120,19 +131,18 @@ if __name__ == "__main__":
results_db_1 = {}
for workload, queries in workloads:
-
vendor_runner.clean_db()
generated_queries = workload.dataset_generator()
if generated_queries:
- vendor_runner.start_preparation("import")
+ vendor_runner.start_db_init("import")
client.execute(queries=generated_queries, num_workers=benchmark_context_db_1.num_workers_for_import)
- vendor_runner.stop("import")
+ vendor_runner.stop_db_init("import")
else:
workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant()))
imported = workload.custom_import()
if not imported:
- vendor_runner.start_preparation("import")
+ vendor_runner.start_db_init("import")
print("Executing database cleanup and index setup...")
client.execute(
file_path=workload.get_index(), num_workers=benchmark_context_db_1.num_workers_for_import
@@ -141,14 +151,14 @@ if __name__ == "__main__":
ret = client.execute(
file_path=workload.get_file(), num_workers=benchmark_context_db_1.num_workers_for_import
)
- usage = vendor_runner.stop("import")
+ usage = vendor_runner.stop_db_init("import")
for group in sorted(queries.keys()):
for query, funcname in queries[group]:
print("Running query:{}/{}/{}".format(group, query, funcname))
func = getattr(workload, funcname)
count = 1
- vendor_runner.start_benchmark("validation")
+ vendor_runner.start_db("validation")
try:
ret = client.execute(queries=get_queries(func, count), num_workers=1, validation=True)[0]
results_db_1[funcname] = ret["results"].items()
@@ -157,7 +167,7 @@ if __name__ == "__main__":
print(e)
results_db_1[funcname] = "Query not executed properly"
finally:
- usage = vendor_runner.stop("validation")
+ usage = vendor_runner.stop_db("validation")
print("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
print("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
@@ -182,19 +192,18 @@ if __name__ == "__main__":
results_db_2 = {}
for workload, queries in workloads:
-
vendor_runner.clean_db()
generated_queries = workload.dataset_generator()
if generated_queries:
- vendor_runner.start_preparation("import")
+ vendor_runner.start_db_init("import")
client.execute(queries=generated_queries, num_workers=benchmark_context_db_2.num_workers_for_import)
vendor_runner.stop("import")
else:
workload.prepare(cache.cache_directory("datasets", workload.NAME, workload.get_variant()))
imported = workload.custom_import()
if not imported:
- vendor_runner.start_preparation("import")
+ vendor_runner.start_db_init("import")
print("Executing database cleanup and index setup...")
client.execute(
file_path=workload.get_index(), num_workers=benchmark_context_db_2.num_workers_for_import
@@ -203,14 +212,14 @@ if __name__ == "__main__":
ret = client.execute(
file_path=workload.get_file(), num_workers=benchmark_context_db_2.num_workers_for_import
)
- usage = vendor_runner.stop("import")
+ usage = vendor_runner.stop_db_init("import")
for group in sorted(queries.keys()):
for query, funcname in queries[group]:
print("Running query:{}/{}/{}".format(group, query, funcname))
func = getattr(workload, funcname)
count = 1
- vendor_runner.start_benchmark("validation")
+ vendor_runner.start_db("validation")
try:
ret = client.execute(queries=get_queries(func, count), num_workers=1, validation=True)[0]
results_db_2[funcname] = ret["results"].items()
@@ -219,7 +228,7 @@ if __name__ == "__main__":
print(e)
results_db_2[funcname] = "Query not executed properly"
finally:
- usage = vendor_runner.stop("validation")
+ usage = vendor_runner.stop_db("validation")
print("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
print("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
diff --git a/tests/mgbench/workloads/__init__.py b/tests/mgbench/workloads/__init__.py
index ed172b041..839099034 100644
--- a/tests/mgbench/workloads/__init__.py
+++ b/tests/mgbench/workloads/__init__.py
@@ -1,3 +1,14 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
from pathlib import Path
modules = Path(__file__).resolve().parent.glob("*.py")
diff --git a/tests/mgbench/workloads/base.py b/tests/mgbench/workloads/base.py
index d6125ab16..12ebe4002 100644
--- a/tests/mgbench/workloads/base.py
+++ b/tests/mgbench/workloads/base.py
@@ -1,4 +1,4 @@
-# Copyright 2022 Memgraph Ltd.
+# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -19,7 +19,6 @@ from benchmark_context import BenchmarkContext
# Base dataset class used as a template to create each individual dataset. All
# common logic is handled here.
class Workload(ABC):
-
# Name of the workload/dataset.
NAME = ""
# List of all variants of the workload/dataset that exist.
@@ -48,6 +47,7 @@ class Workload(ABC):
def __init_subclass__(cls) -> None:
name_prerequisite = "NAME" in cls.__dict__
generator_prerequisite = "dataset_generator" in cls.__dict__
+ generator_indexes_prerequisite = "indexes_generator" in cls.__dict__
custom_import_prerequisite = "custom_import" in cls.__dict__
basic_import_prerequisite = ("LOCAL_FILE" in cls.__dict__ or "URL_FILE" in cls.__dict__) and (
"LOCAL_INDEX_FILE" in cls.__dict__ or "URL_INDEX_FILE" in cls.__dict__
@@ -55,21 +55,20 @@ class Workload(ABC):
if not name_prerequisite:
raise ValueError(
- """Can't define a workload class {} without NAME property:
- NAME = "dataset name"
- Name property defines the workload you want to execute, for example: "demo/*/*/*"
- """.format(
+ """
+ Can't define a workload class {} without NAME property: NAME = "dataset name"
+ Name property defines the workload you want to execute, for example: "demo/*/*/*"
+ """.format(
cls.__name__
)
)
- # Check workload is in generator or dataset mode during interpretation (not both), not runtime
if generator_prerequisite and (custom_import_prerequisite or basic_import_prerequisite):
raise ValueError(
"""
- The workload class {} cannot have defined dataset import and generate dataset at
- the same time.
- """.format(
+ The workload class {} cannot have defined dataset import and generate dataset at
+ the same time.
+ """.format(
cls.__name__
)
)
@@ -77,12 +76,15 @@ class Workload(ABC):
if not generator_prerequisite and (not custom_import_prerequisite and not basic_import_prerequisite):
raise ValueError(
"""
- The workload class {} need to have defined dataset import or dataset generator
- """.format(
+ The workload class {} need to have defined dataset import or dataset generator
+ """.format(
cls.__name__
)
)
+ if generator_prerequisite and not generator_indexes_prerequisite:
+ raise ValueError("The workload class {} need to define indexes_generator for generating a dataset. ")
+
return super().__init_subclass__()
def __init__(self, variant: str = None, benchmark_context: BenchmarkContext = None):
diff --git a/tests/mgbench/workloads/demo.py b/tests/mgbench/workloads/demo.py
index 2b758d5ef..0a0f6f2e0 100644
--- a/tests/mgbench/workloads/demo.py
+++ b/tests/mgbench/workloads/demo.py
@@ -1,28 +1,56 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
import random
from workloads.base import Workload
class Demo(Workload):
-
NAME = "demo"
+ def indexes_generator(self):
+ indexes = []
+ if "neo4j" in self.benchmark_context.vendor_name:
+ indexes.extend(
+ [
+ ("CREATE INDEX FOR (n:NodeA) ON (n.id);", {}),
+ ("CREATE INDEX FOR (n:NodeB) ON (n.id);", {}),
+ ]
+ )
+ else:
+ indexes.extend(
+ [
+ ("CREATE INDEX ON :NodeA(id);", {}),
+ ("CREATE INDEX ON :NodeB(id);", {}),
+ ]
+ )
+ return indexes
+
def dataset_generator(self):
-
- queries = [("MATCH (n) DETACH DELETE n;", {})]
- for i in range(0, 100):
- queries.append(("CREATE (:NodeA{{ id:{}}});".format(i), {}))
- queries.append(("CREATE (:NodeB{{ id:{}}});".format(i), {}))
-
- for i in range(0, 100):
- a = random.randint(0, 99)
- b = random.randint(0, 99)
- queries.append(("MATCH(a:NodeA{{ id: {}}}),(b:NodeB{{id: {}}}) CREATE (a)-[:EDGE]->(b)".format(a, b), {}))
+ queries = []
+ for i in range(0, 10000):
+ queries.append(("CREATE (:NodeA {id: $id});", {"id": i}))
+ queries.append(("CREATE (:NodeB {id: $id});", {"id": i}))
+ for i in range(0, 50000):
+ a = random.randint(0, 9999)
+ b = random.randint(0, 9999)
+ queries.append(
+ (("MATCH(a:NodeA {id: $A_id}),(b:NodeB{id: $B_id}) CREATE (a)-[:EDGE]->(b)"), {"A_id": a, "B_id": b})
+ )
return queries
- def benchmark__test__sample_query1(self):
- return ("MATCH (n) RETURN n", {})
+ def benchmark__test__get_nodes(self):
+ return ("MATCH (n) RETURN n;", {})
- def benchmark__test__sample_query2(self):
- return ("MATCH (n) RETURN n", {})
+ def benchmark__test__get_node_by_id(self):
+ return ("MATCH (n:NodeA{id: $id}) RETURN n;", {"id": random.randint(0, 9999)})
diff --git a/tests/mgbench/workloads/importers/importer_ldbc_bi.py b/tests/mgbench/workloads/importers/importer_ldbc_bi.py
index 6c84ba75d..9c4cdf332 100644
--- a/tests/mgbench/workloads/importers/importer_ldbc_bi.py
+++ b/tests/mgbench/workloads/importers/importer_ldbc_bi.py
@@ -1,3 +1,15 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+# --- DISCLAIMER: This is NOT an official implementation of an LDBC Benchmark. ---
import csv
import subprocess
from collections import defaultdict
@@ -21,7 +33,6 @@ class ImporterLDBCBI:
self._csv_dict = csv_dict
def execute_import(self):
-
vendor_runner = BaseRunner.create(
benchmark_context=self._benchmark_context,
)
@@ -30,8 +41,8 @@ class ImporterLDBCBI:
if self._benchmark_context.vendor_name == "neo4j":
data_dir = Path() / ".cache" / "datasets" / self._dataset_name / self._variant / "data_neo4j"
data_dir.mkdir(parents=True, exist_ok=True)
- dir_name = self._csv_dict[self._variant].split("/")[-1:][0].removesuffix(".tar.zst")
- if (data_dir / dir_name).exists():
+ dir_name = self._csv_dict[self._variant].split("/")[-1:][0].replace(".tar.zst", "")
+ if (data_dir / dir_name).exists() and any((data_dir / dir_name).iterdir()):
print("Files downloaded")
data_dir = data_dir / dir_name
else:
@@ -42,7 +53,7 @@ class ImporterLDBCBI:
headers_dir = Path() / ".cache" / "datasets" / self._dataset_name / self._variant / "headers_neo4j"
headers_dir.mkdir(parents=True, exist_ok=True)
- headers = HEADERS_URL.split("/")[-1:][0].removesuffix(".tar.gz")
+ headers = HEADERS_URL.split("/")[-1:][0].replace(".tar.gz", "")
if (headers_dir / headers).exists():
print("Header files downloaded.")
else:
@@ -204,10 +215,10 @@ class ImporterLDBCBI:
check=True,
)
- vendor_runner.start_preparation("Index preparation")
+ vendor_runner.start_db_init("Index preparation")
print("Executing database index setup")
client.execute(file_path=self._index_file, num_workers=1)
- vendor_runner.stop("Stop index preparation")
+ vendor_runner.stop_db_init("Stop index preparation")
return True
else:
return False
diff --git a/tests/mgbench/workloads/importers/importer_ldbc_interactive.py b/tests/mgbench/workloads/importers/importer_ldbc_interactive.py
index 3c78405b7..782ba8ebf 100644
--- a/tests/mgbench/workloads/importers/importer_ldbc_interactive.py
+++ b/tests/mgbench/workloads/importers/importer_ldbc_interactive.py
@@ -1,3 +1,15 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+# --- DISCLAIMER: This is NOT an official implementation of an LDBC Benchmark. ---
import csv
import subprocess
from pathlib import Path
@@ -53,7 +65,6 @@ class ImporterLDBCInteractive:
self._csv_dict = csv_dict
def execute_import(self):
-
vendor_runner = BaseRunner.create(
benchmark_context=self._benchmark_context,
)
@@ -63,8 +74,8 @@ class ImporterLDBCInteractive:
print("Runnning Neo4j import")
dump_dir = Path() / ".cache" / "datasets" / self._dataset_name / self._variant / "dump"
dump_dir.mkdir(parents=True, exist_ok=True)
- dir_name = self._csv_dict[self._variant].split("/")[-1:][0].removesuffix(".tar.zst")
- if (dump_dir / dir_name).exists():
+ dir_name = self._csv_dict[self._variant].split("/")[-1:][0].replace(".tar.zst", "")
+ if (dump_dir / dir_name).exists() and any((dump_dir / dir_name).iterdir()):
print("Files downloaded")
dump_dir = dump_dir / dir_name
else:
@@ -153,10 +164,10 @@ class ImporterLDBCInteractive:
check=True,
)
- vendor_runner.start_preparation("Index preparation")
+ vendor_runner.start_db_init("Index preparation")
print("Executing database index setup")
client.execute(file_path=self._index_file, num_workers=1)
- vendor_runner.stop("Stop index preparation")
+ vendor_runner.stop_db_init("Stop index preparation")
return True
else:
diff --git a/tests/mgbench/workloads/importers/importer_pokec.py b/tests/mgbench/workloads/importers/importer_pokec.py
index ee6621369..d179b26d5 100644
--- a/tests/mgbench/workloads/importers/importer_pokec.py
+++ b/tests/mgbench/workloads/importers/importer_pokec.py
@@ -1,5 +1,17 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
from pathlib import Path
+import log
from benchmark_context import BenchmarkContext
from runners import BaseRunner
@@ -16,26 +28,22 @@ class ImporterPokec:
def execute_import(self):
if self._benchmark_context.vendor_name == "neo4j":
-
+ neo4j_dump = Path() / ".cache" / "datasets" / self._dataset_name / self._variant / "neo4j.dump"
vendor_runner = BaseRunner.create(
benchmark_context=self._benchmark_context,
)
- client = vendor_runner.fetch_client()
vendor_runner.clean_db()
- vendor_runner.start_preparation("preparation")
- print("Executing database cleanup and index setup...")
- client.execute(file_path=self._index_file, num_workers=1)
- vendor_runner.stop("preparation")
- neo4j_dump = Path() / ".cache" / "datasets" / self._dataset_name / self._variant / "neo4j.dump"
if neo4j_dump.exists():
+ log.log("Loading database from existing dump...")
vendor_runner.load_db_from_dump(path=neo4j_dump.parent)
else:
- vendor_runner.start_preparation("import")
+ client = vendor_runner.fetch_client()
+ vendor_runner.start_db_init("import")
+ print("Executing database index setup...")
+ client.execute(file_path=self._index_file, num_workers=1)
print("Importing dataset...")
client.execute(file_path=self._dataset_file, num_workers=self._benchmark_context.num_workers_for_import)
- vendor_runner.stop("import")
- vendor_runner.dump_db(path=neo4j_dump.parent)
-
+ vendor_runner.stop_db_init("import")
return True
else:
return False
diff --git a/tests/mgbench/workloads/ldbc_bi.py b/tests/mgbench/workloads/ldbc_bi.py
index e1d28577b..d18275779 100644
--- a/tests/mgbench/workloads/ldbc_bi.py
+++ b/tests/mgbench/workloads/ldbc_bi.py
@@ -1,3 +1,15 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+# --- DISCLAIMER: This is NOT an official implementation of an LDBC Benchmark. ---
import inspect
import random
from pathlib import Path
@@ -63,7 +75,7 @@ class LDBC_BI(Workload):
print("Downloading files")
downloaded_file = helpers.download_file(self.QUERY_PARAMETERS[self._variant], parameters.parent.absolute())
print("Unpacking the file..." + downloaded_file)
- parameters = helpers.unpack_zip(Path(downloaded_file))
+ helpers.unpack_zip(Path(downloaded_file))
return parameters / ("parameters-" + self._variant)
def _get_query_parameters(self) -> dict:
@@ -71,7 +83,7 @@ class LDBC_BI(Workload):
parameters = {}
for file in self._parameters_dir.glob("bi-*.csv"):
file_name_query_id = file.name.split("-")[1][0:-4]
- func_name_id = func_name.split("_")[-1]
+ func_name_id = func_name.split("_")[-2]
if file_name_query_id == func_name_id or file_name_query_id == func_name_id + "a":
with file.open("r") as input:
lines = input.readlines()
@@ -103,7 +115,6 @@ class LDBC_BI(Workload):
self._parameters_dir = self._prepare_parameters_directory()
def benchmark__bi__query_1_analytical(self):
-
memgraph = (
"""
MATCH (message:Message)
@@ -197,7 +208,6 @@ class LDBC_BI(Workload):
return neo4j
def benchmark__bi__query_2_analytical(self):
-
memgraph = (
"""
MATCH (tag:Tag)-[:HAS_TYPE]->(:TagClass {name: $tagClass})
@@ -327,7 +337,6 @@ class LDBC_BI(Workload):
)
def benchmark__bi__query_7_analytical(self):
-
memgraph = (
"""
MATCH
@@ -622,59 +631,7 @@ class LDBC_BI(Workload):
self._get_query_parameters(),
)
- def benchmark__bi__query_17_analytical(self):
-
- memgraph = (
- """
- MATCH
- (tag:Tag {name: $tag}),
- (person1:Person)<-[:HAS_CREATOR]-(message1:Message)-[:REPLY_OF*0..]->(post1:Post)<-[:CONTAINER_OF]-(forum1:Forum),
- (message1)-[:HAS_TAG]->(tag),
- (forum1)<-[:HAS_MEMBER]->(person2:Person)<-[:HAS_CREATOR]-(comment:Comment)-[:HAS_TAG]->(tag),
- (forum1)<-[:HAS_MEMBER]->(person3:Person)<-[:HAS_CREATOR]-(message2:Message),
- (comment)-[:REPLY_OF]->(message2)-[:REPLY_OF*0..]->(post2:Post)<-[:CONTAINER_OF]-(forum2:Forum)
- MATCH (comment)-[:HAS_TAG]->(tag)
- MATCH (message2)-[:HAS_TAG]->(tag)
- OPTIONAL MATCH (forum2)-[:HAS_MEMBER]->(person1)
- WHERE forum1 <> forum2 AND message2.creationDate > message1.creationDate + duration({hours: $delta}) AND person1 IS NULL
- RETURN person1, count(DISTINCT message2) AS messageCount
- ORDER BY messageCount DESC, person1.id ASC
- LIMIT 10
- """.replace(
- "\n", ""
- ),
- self._get_query_parameters(),
- )
-
- neo4j = (
- """
- MATCH
- (tag:Tag {name: $tag}),
- (person1:Person)<-[:HAS_CREATOR]-(message1:Message)-[:REPLY_OF*0..]->(post1:Post)<-[:CONTAINER_OF]-(forum1:Forum),
- (message1)-[:HAS_TAG]->(tag),
- (forum1)<-[:HAS_MEMBER]->(person2:Person)<-[:HAS_CREATOR]-(comment:Comment)-[:HAS_TAG]->(tag),
- (forum1)<-[:HAS_MEMBER]->(person3:Person)<-[:HAS_CREATOR]-(message2:Message),
- (comment)-[:REPLY_OF]->(message2)-[:REPLY_OF*0..]->(post2:Post)<-[:CONTAINER_OF]-(forum2:Forum)
- MATCH (comment)-[:HAS_TAG]->(tag)
- MATCH (message2)-[:HAS_TAG]->(tag)
- WHERE forum1 <> forum2
- AND message2.creationDate > message1.creationDate + duration({hours: $delta})
- AND NOT (forum2)-[:HAS_MEMBER]->(person1)
- RETURN person1, count(DISTINCT message2) AS messageCount
- ORDER BY messageCount DESC, person1.id ASC
- LIMIT 10
- """.replace(
- "\n", ""
- ),
- self._get_query_parameters(),
- )
- if self._vendor == "memgraph":
- return memgraph
- else:
- return neo4j
-
def benchmark__bi__query_18_analytical(self):
-
memgraph = (
"""
MATCH (tag:Tag {name: $tag})<-[:HAS_INTEREST]-(person1:Person)-[:KNOWS]-(mutualFriend:Person)-[:KNOWS]-(person2:Person)-[:HAS_INTEREST]->(tag)
diff --git a/tests/mgbench/workloads/ldbc_interactive.py b/tests/mgbench/workloads/ldbc_interactive.py
index 576025949..71dcdb37c 100644
--- a/tests/mgbench/workloads/ldbc_interactive.py
+++ b/tests/mgbench/workloads/ldbc_interactive.py
@@ -1,3 +1,15 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+# --- DISCLAIMER: This is NOT an official implementation of an LDBC Benchmark. ---
import inspect
import random
from datetime import datetime
@@ -10,7 +22,6 @@ from workloads.importers.importer_ldbc_interactive import *
class LDBC_Interactive(Workload):
-
NAME = "ldbc_interactive"
VARIANTS = ["sf0.1", "sf1", "sf3", "sf10"]
DEFAULT_VARIANT = "sf1"
@@ -31,7 +42,7 @@ class LDBC_Interactive(Workload):
SIZES = {
"sf0.1": {"vertices": 327588, "edges": 1477965},
"sf1": {"vertices": 3181724, "edges": 17256038},
- "sf3": {"vertices": 1, "edges": 1},
+ "sf3": {"vertices": 9281922, "edges": 52695735},
"sf10": {"vertices": 1, "edges": 1},
}
@@ -44,8 +55,8 @@ class LDBC_Interactive(Workload):
QUERY_PARAMETERS = {
"sf0.1": "https://repository.surfsara.nl/datasets/cwi/snb/files/substitution_parameters/substitution_parameters-sf0.1.tar.zst",
- "sf1": "https://repository.surfsara.nl/datasets/cwi/snb/files/substitution_parameters/substitution_parameters-sf0.1.tar.zst",
- "sf3": "https://repository.surfsara.nl/datasets/cwi/snb/files/substitution_parameters/substitution_parameters-sf0.1.tar.zst",
+ "sf1": "https://repository.surfsara.nl/datasets/cwi/snb/files/substitution_parameters/substitution_parameters-sf1.tar.zst",
+ "sf3": "https://repository.surfsara.nl/datasets/cwi/snb/files/substitution_parameters/substitution_parameters-sf3.tar.zst",
}
def custom_import(self) -> bool:
@@ -61,7 +72,7 @@ class LDBC_Interactive(Workload):
def _prepare_parameters_directory(self):
parameters = Path() / ".cache" / "datasets" / self.NAME / self._variant / "parameters"
parameters.mkdir(parents=True, exist_ok=True)
- dir_name = self.QUERY_PARAMETERS[self._variant].split("/")[-1:][0].removesuffix(".tar.zst")
+ dir_name = self.QUERY_PARAMETERS[self._variant].split("/")[-1:][0].replace(".tar.zst", "")
if (parameters / dir_name).exists():
print("Files downloaded:")
parameters = parameters / dir_name
@@ -230,7 +241,6 @@ class LDBC_Interactive(Workload):
)
def benchmark__interactive__complex_query_3_analytical(self):
-
memgraph = (
"""
MATCH (countryX:Country {name: $countryXName }),
@@ -327,8 +337,9 @@ class LDBC_Interactive(Workload):
RETURN tag.name AS tagName, postCount
ORDER BY postCount DESC, tagName ASC
LIMIT 10
-
- """,
+ """.replace(
+ "\n", ""
+ ),
self._get_query_parameters(),
)
@@ -351,8 +362,9 @@ class LDBC_Interactive(Workload):
RETURN tag.name AS tagName, postCount
ORDER BY postCount DESC, tagName ASC
LIMIT 10
-
- """,
+ """.replace(
+ "\n", ""
+ ),
self._get_query_parameters(),
)
@@ -528,72 +540,6 @@ class LDBC_Interactive(Workload):
self._get_query_parameters(),
)
- def benchmark__interactive__complex_query_10_analytical(self):
- memgraph = (
- """
- MATCH (person:Person {id: $personId})-[:KNOWS*2..2]-(friend),
- (friend)-[:IS_LOCATED_IN]->(city:City)
- WHERE NOT friend=person AND
- NOT (friend)-[:KNOWS]-(person)
- WITH person, city, friend, datetime({epochMillis: friend.birthday}) as birthday
- WHERE (birthday.month=$month AND birthday.day>=21) OR
- (birthday.month=($month%12)+1 AND birthday.day<22)
- WITH DISTINCT friend, city, person
- OPTIONAL MATCH (friend)<-[:HAS_CREATOR]-(post:Post)
- WITH friend, city, collect(post) AS posts, person
- WITH friend,
- city,
- size(posts) AS postCount,
- size([p IN posts WHERE (p)-[:HAS_TAG]->()<-[:HAS_INTEREST]-(person)]) AS commonPostCount
- RETURN friend.id AS personId,
- friend.firstName AS personFirstName,
- friend.lastName AS personLastName,
- commonPostCount - (postCount - commonPostCount) AS commonInterestScore,
- friend.gender AS personGender,
- city.name AS personCityName
- ORDER BY commonInterestScore DESC, personId ASC
- LIMIT 10
- """.replace(
- "\n", ""
- ),
- self._get_query_parameters(),
- )
-
- neo4j = (
- """
- MATCH (person:Person {id: $personId})-[:KNOWS*2..2]-(friend),
- (friend)-[:IS_LOCATED_IN]->(city:City)
- WHERE NOT friend=person AND
- NOT (friend)-[:KNOWS]-(person)
- WITH person, city, friend, datetime({epochMillis: friend.birthday}) as birthday
- WHERE (birthday.month=$month AND birthday.day>=21) OR
- (birthday.month=($month%12)+1 AND birthday.day<22)
- WITH DISTINCT friend, city, person
- OPTIONAL MATCH (friend)<-[:HAS_CREATOR]-(post:Post)
- WITH friend, city, collect(post) AS posts, person
- WITH friend,
- city,
- size(posts) AS postCount,
- size([p IN posts WHERE (p)-[:HAS_TAG]->()<-[:HAS_INTEREST]-(person)]) AS commonPostCount
- RETURN friend.id AS personId,
- friend.firstName AS personFirstName,
- friend.lastName AS personLastName,
- commonPostCount - (postCount - commonPostCount) AS commonInterestScore,
- friend.gender AS personGender,
- city.name AS personCityName
- ORDER BY commonInterestScore DESC, personId ASC
- LIMIT 10
- """.replace(
- "\n", ""
- ),
- self._get_query_parameters(),
- )
-
- if self._vendor == "memgraph":
- return memgraph
- else:
- return neo4j
-
def benchmark__interactive__complex_query_11_analytical(self):
return (
"""
diff --git a/tests/mgbench/workloads/pokec.py b/tests/mgbench/workloads/pokec.py
index afecf0b6e..6733d38f2 100644
--- a/tests/mgbench/workloads/pokec.py
+++ b/tests/mgbench/workloads/pokec.py
@@ -1,3 +1,14 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
import random
from benchmark_context import BenchmarkContext
diff --git a/tests/mgbench/zip_benchgraph.py b/tests/mgbench/zip_benchgraph.py
new file mode 100644
index 000000000..9ce551376
--- /dev/null
+++ b/tests/mgbench/zip_benchgraph.py
@@ -0,0 +1,46 @@
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+import zipfile
+from pathlib import Path
+
+import log
+
+
+def zip_benchgraph():
+ log.info("Creating benchgraph.zip ...")
+ parent = Path(__file__).resolve().parent
+ zip = zipfile.ZipFile("./benchgraph.zip", "w")
+ zip.write(parent / "benchmark.py", "benchgraph/benchmark.py")
+ zip.write(parent / "setup.py", "benchgraph/setup.py")
+ zip.write(parent / "log.py", "benchgraph/log.py")
+ zip.write(parent / "benchmark_context.py", "benchgraph/benchmark_context.py")
+ zip.write(parent / "validation.py", "benchgraph/validation.py")
+ zip.write(parent / "compare_results.py", "benchgraph/compare_results.py")
+ zip.write(parent / "runners.py", "benchgraph/runners.py")
+ zip.write(parent / "helpers.py", "benchgraph/helpers.py")
+ zip.write(parent / "graph_bench.py", "benchgraph/graph_bench.py")
+ zip.write(parent / "README.md", "benchgraph/README.md")
+ zip.write(parent / "how_to_use_benchgraph.md", "benchgraph/how_to_use_benchgraph.md")
+ zip.write(parent / "workloads/__init__.py", "benchgraph/workloads/__init__.py")
+ zip.write(parent / "workloads/base.py", "benchgraph/workloads/base.py")
+ zip.write(parent / "workloads/demo.py", "benchgraph/workloads/demo.py")
+
+ zip.close()
+
+
+if __name__ == "__main__":
+ zip_benchgraph()
+
+ if Path("./benchgraph.zip").is_file():
+ log.success("benchgraph.zip created successfully")
+ else:
+ log.error("benchgraph.zip was not created")