Add parser stress test (#463)

This commit is contained in:
Marko Budiselić 2022-07-26 20:54:56 +02:00 committed by GitHub
parent 351258ace8
commit 80e0e439b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 163 additions and 56 deletions

View File

@ -199,7 +199,7 @@ git apply ../rocksdb.patch
popd
# mgclient
mgclient_tag="96e95c6845463cbe88948392be58d26da0d5ffd3" # (2022-02-08)
mgclient_tag="v1.4.0" # (2022-06-14)
repo_clone_try_double "${primary_urls[mgclient]}" "${secondary_urls[mgclient]}" "mgclient" "$mgclient_tag"
sed -i 's/\${CMAKE_INSTALL_LIBDIR}/lib/' mgclient/src/CMakeLists.txt

View File

@ -17,3 +17,6 @@ endfunction(add_stress_test)
add_stress_test(long_running.cpp)
target_link_libraries(${test_prefix}long_running mg-communication mg-io mg-utils)
add_stress_test(parser.cpp)
target_link_libraries(${test_prefix}parser mg-communication mg-io mg-utils mgclient)

View File

@ -26,6 +26,11 @@ SMALL_DATASET = [
"options": ["--vertex-count", "40000", "--create-pack-size", "100"],
"timeout": 5,
},
{
"test": "parser.cpp",
"options": ["--per-worker-query-count", "1000"],
"timeout": 5,
},
{
"test": "long_running.cpp",
"options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"],
@ -42,30 +47,35 @@ SMALL_DATASET = [
# bipartite.py and create_match.py run for approx. 15min
# long_running runs for 5min x 6 times = 30min
# long_running runs for 8h
LARGE_DATASET = [
{
"test": "bipartite.py",
"options": ["--u-count", "300", "--v-count", "300"],
"timeout": 30,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "500000", "--create-pack-size", "500"],
"timeout": 30,
},
] + [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"],
"timeout": 16,
},
] * 6 + [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"],
"timeout": 500,
},
]
LARGE_DATASET = (
[
{
"test": "bipartite.py",
"options": ["--u-count", "300", "--v-count", "300"],
"timeout": 30,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "500000", "--create-pack-size", "500"],
"timeout": 30,
},
]
+ [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"],
"timeout": 16,
},
]
* 6
+ [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"],
"timeout": 500,
},
]
)
# paths
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -101,8 +111,7 @@ def run_test(args, test, options, timeout):
# find binary
if test.endswith(".py"):
logging = "DEBUG" if args.verbose else "WARNING"
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test),
"--logging", logging]
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging]
elif test.endswith(".cpp"):
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
binary = [exe]
@ -112,11 +121,10 @@ def run_test(args, test, options, timeout):
# start test
cmd = binary + ["--worker-count", str(THREADS)] + options
start = time.time()
ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60)
ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60)
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(
test, ret_test.returncode))
raise Exception("Test '{}' binary returned non-zero ({})!".format(test, ret_test.returncode))
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
@ -125,39 +133,54 @@ def run_test(args, test, options, timeout):
# parse arguments
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--log-file", default = "")
parser.add_argument("--data-directory", default = "")
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type = str)
parser.add_argument("--large-dataset", action = "store_const",
const = True, default = False)
parser.add_argument("--use-ssl", action = "store_const",
const = True, default = False)
parser.add_argument("--verbose", action = "store_const",
const = True, default = False)
parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph"))
parser.add_argument("--log-file", default="")
parser.add_argument("--data-directory", default="")
parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type=str)
parser.add_argument("--large-dataset", action="store_const", const=True, default=False)
parser.add_argument("--use-ssl", action="store_const", const=True, default=False)
parser.add_argument("--verbose", action="store_const", const=True, default=False)
args = parser.parse_args()
# generate temporary SSL certs
if args.use_ssl:
# https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively
subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com"
subprocess.run(["openssl", "req", "-new", "-newkey", "rsa:4096",
"-days", "365", "-nodes", "-x509", "-subj", subj,
"-keyout", KEY_FILE, "-out", CERT_FILE], check=True)
subprocess.run(
[
"openssl",
"req",
"-new",
"-newkey",
"rsa:4096",
"-days",
"365",
"-nodes",
"-x509",
"-subj",
subj,
"-keyout",
KEY_FILE,
"-out",
CERT_FILE,
],
check=True,
)
# start memgraph
cwd = os.path.dirname(args.memgraph)
cmd = [args.memgraph, "--bolt-num-workers=" + str(THREADS),
"--storage-properties-on-edges=true",
"--storage-snapshot-on-exit=true",
"--storage-snapshot-interval-sec=600",
"--storage-snapshot-retention-count=1",
"--storage-wal-enabled=true",
"--storage-recover-on-startup=false",
"--query-execution-timeout-sec=1200"]
cmd = [
args.memgraph,
"--bolt-num-workers=" + str(THREADS),
"--storage-properties-on-edges=true",
"--storage-snapshot-on-exit=true",
"--storage-snapshot-interval-sec=600",
"--storage-snapshot-retention-count=1",
"--storage-wal-enabled=true",
"--storage-recover-on-startup=false",
"--query-execution-timeout-sec=1200",
]
if not args.verbose:
cmd += ["--log-level", "WARNING"]
if args.log_file:
@ -166,7 +189,7 @@ if args.data_directory:
cmd += ["--data-directory", args.data_directory]
if args.use_ssl:
cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE]
proc_mg = subprocess.Popen(cmd, cwd = cwd)
proc_mg = subprocess.Popen(cmd, cwd=cwd)
wait_for_server(7687)
assert proc_mg.poll() is None, "The database binary died prematurely!"
@ -174,10 +197,12 @@ assert proc_mg.poll() is None, "The database binary died prematurely!"
@atexit.register
def cleanup():
global proc_mg
if proc_mg.poll() != None: return
if proc_mg.poll() != None:
return
proc_mg.kill()
proc_mg.wait()
# run tests
runtimes = {}
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET

79
tests/stress/parser.cpp Normal file
View File

@ -0,0 +1,79 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <limits>
#include <random>
#include <thread>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "mgclient.hpp"
#include "utils/timer.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_int32(worker_count, 1, "The number of concurrent workers executing queries against the server.");
DEFINE_int32(per_worker_query_count, 100, "The number of queries each worker will try to execute.");
auto make_client() {
mg::Client::Params params;
params.host = FLAGS_address;
params.port = static_cast<uint16_t>(FLAGS_port);
params.use_ssl = FLAGS_use_ssl;
return mg::Client::Connect(params);
}
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
mg::Client::Init();
spdlog::info("Cleaning the database instance...");
auto client = make_client();
client->Execute("MATCH (n) DETACH DELETE n");
client->DiscardAll();
spdlog::info(fmt::format("Starting parser stress test with {} workers and {} queries per worker...",
FLAGS_worker_count, FLAGS_per_worker_query_count));
std::vector<std::thread> threads;
memgraph::utils::Timer timer;
for (int i = 0; i < FLAGS_worker_count; ++i) {
threads.push_back(std::thread([]() {
auto client = make_client();
std::mt19937 generator{std::random_device{}()};
std::uniform_int_distribution<uint64_t> distribution{std::numeric_limits<uint64_t>::min(),
std::numeric_limits<uint64_t>::max()};
for (int i = 0; i < FLAGS_per_worker_query_count; ++i) {
try {
auto is_executed = client->Execute(fmt::format("MATCH (n:Label{}) RETURN n;", distribution(generator)));
if (!is_executed) {
LOG_FATAL("One of the parser stress test queries failed.");
}
client->FetchAll();
} catch (const std::exception &e) {
LOG_FATAL("One of the parser stress test queries failed.");
}
}
}));
}
std::ranges::for_each(threads, [](auto &t) { t.join(); });
spdlog::info(
fmt::format("All queries executed in {:.4f}s. The parser managed to handle the load.", timer.Elapsed().count()));
mg::Client::Finalize();
return 0;
}