diff --git a/libs/setup.sh b/libs/setup.sh index dd59d5d3a..d902aadb1 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -199,7 +199,7 @@ git apply ../rocksdb.patch popd # mgclient -mgclient_tag="96e95c6845463cbe88948392be58d26da0d5ffd3" # (2022-02-08) +mgclient_tag="v1.4.0" # (2022-06-14) repo_clone_try_double "${primary_urls[mgclient]}" "${secondary_urls[mgclient]}" "mgclient" "$mgclient_tag" sed -i 's/\${CMAKE_INSTALL_LIBDIR}/lib/' mgclient/src/CMakeLists.txt diff --git a/tests/stress/CMakeLists.txt b/tests/stress/CMakeLists.txt index f4ca745fa..18b784dc0 100644 --- a/tests/stress/CMakeLists.txt +++ b/tests/stress/CMakeLists.txt @@ -17,3 +17,6 @@ endfunction(add_stress_test) add_stress_test(long_running.cpp) target_link_libraries(${test_prefix}long_running mg-communication mg-io mg-utils) + +add_stress_test(parser.cpp) +target_link_libraries(${test_prefix}parser mg-communication mg-io mg-utils mgclient) diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index 697f5158d..87d5bcaf6 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -26,6 +26,11 @@ SMALL_DATASET = [ "options": ["--vertex-count", "40000", "--create-pack-size", "100"], "timeout": 5, }, + { + "test": "parser.cpp", + "options": ["--per-worker-query-count", "1000"], + "timeout": 5, + }, { "test": "long_running.cpp", "options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"], @@ -42,30 +47,35 @@ SMALL_DATASET = [ # bipartite.py and create_match.py run for approx. 15min # long_running runs for 5min x 6 times = 30min # long_running runs for 8h -LARGE_DATASET = [ - { - "test": "bipartite.py", - "options": ["--u-count", "300", "--v-count", "300"], - "timeout": 30, - }, - { - "test": "create_match.py", - "options": ["--vertex-count", "500000", "--create-pack-size", "500"], - "timeout": 30, - }, -] + [ - { - "test": "long_running.cpp", - "options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"], - "timeout": 16, - }, -] * 6 + [ - { - "test": "long_running.cpp", - "options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"], - "timeout": 500, - }, -] +LARGE_DATASET = ( + [ + { + "test": "bipartite.py", + "options": ["--u-count", "300", "--v-count", "300"], + "timeout": 30, + }, + { + "test": "create_match.py", + "options": ["--vertex-count", "500000", "--create-pack-size", "500"], + "timeout": 30, + }, + ] + + [ + { + "test": "long_running.cpp", + "options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"], + "timeout": 16, + }, + ] + * 6 + + [ + { + "test": "long_running.cpp", + "options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"], + "timeout": 500, + }, + ] +) # paths SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -101,8 +111,7 @@ def run_test(args, test, options, timeout): # find binary if test.endswith(".py"): logging = "DEBUG" if args.verbose else "WARNING" - binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), - "--logging", logging] + binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging] elif test.endswith(".cpp"): exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4]) binary = [exe] @@ -112,11 +121,10 @@ def run_test(args, test, options, timeout): # start test cmd = binary + ["--worker-count", str(THREADS)] + options start = time.time() - ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60) + ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60) if ret_test.returncode != 0: - raise Exception("Test '{}' binary returned non-zero ({})!".format( - test, ret_test.returncode)) + raise Exception("Test '{}' binary returned non-zero ({})!".format(test, ret_test.returncode)) runtime = time.time() - start print(" Done after {:.3f} seconds".format(runtime)) @@ -125,39 +133,54 @@ def run_test(args, test, options, timeout): # parse arguments -parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.") -parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR, - "memgraph")) -parser.add_argument("--log-file", default = "") -parser.add_argument("--data-directory", default = "") -parser.add_argument("--python", default = os.path.join(SCRIPT_DIR, - "ve3", "bin", "python3"), type = str) -parser.add_argument("--large-dataset", action = "store_const", - const = True, default = False) -parser.add_argument("--use-ssl", action = "store_const", - const = True, default = False) -parser.add_argument("--verbose", action = "store_const", - const = True, default = False) +parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.") +parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph")) +parser.add_argument("--log-file", default="") +parser.add_argument("--data-directory", default="") +parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type=str) +parser.add_argument("--large-dataset", action="store_const", const=True, default=False) +parser.add_argument("--use-ssl", action="store_const", const=True, default=False) +parser.add_argument("--verbose", action="store_const", const=True, default=False) args = parser.parse_args() # generate temporary SSL certs if args.use_ssl: # https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com" - subprocess.run(["openssl", "req", "-new", "-newkey", "rsa:4096", - "-days", "365", "-nodes", "-x509", "-subj", subj, - "-keyout", KEY_FILE, "-out", CERT_FILE], check=True) + subprocess.run( + [ + "openssl", + "req", + "-new", + "-newkey", + "rsa:4096", + "-days", + "365", + "-nodes", + "-x509", + "-subj", + subj, + "-keyout", + KEY_FILE, + "-out", + CERT_FILE, + ], + check=True, + ) # start memgraph cwd = os.path.dirname(args.memgraph) -cmd = [args.memgraph, "--bolt-num-workers=" + str(THREADS), - "--storage-properties-on-edges=true", - "--storage-snapshot-on-exit=true", - "--storage-snapshot-interval-sec=600", - "--storage-snapshot-retention-count=1", - "--storage-wal-enabled=true", - "--storage-recover-on-startup=false", - "--query-execution-timeout-sec=1200"] +cmd = [ + args.memgraph, + "--bolt-num-workers=" + str(THREADS), + "--storage-properties-on-edges=true", + "--storage-snapshot-on-exit=true", + "--storage-snapshot-interval-sec=600", + "--storage-snapshot-retention-count=1", + "--storage-wal-enabled=true", + "--storage-recover-on-startup=false", + "--query-execution-timeout-sec=1200", +] if not args.verbose: cmd += ["--log-level", "WARNING"] if args.log_file: @@ -166,7 +189,7 @@ if args.data_directory: cmd += ["--data-directory", args.data_directory] if args.use_ssl: cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE] -proc_mg = subprocess.Popen(cmd, cwd = cwd) +proc_mg = subprocess.Popen(cmd, cwd=cwd) wait_for_server(7687) assert proc_mg.poll() is None, "The database binary died prematurely!" @@ -174,10 +197,12 @@ assert proc_mg.poll() is None, "The database binary died prematurely!" @atexit.register def cleanup(): global proc_mg - if proc_mg.poll() != None: return + if proc_mg.poll() != None: + return proc_mg.kill() proc_mg.wait() + # run tests runtimes = {} dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET diff --git a/tests/stress/parser.cpp b/tests/stress/parser.cpp new file mode 100644 index 000000000..b20cdd257 --- /dev/null +++ b/tests/stress/parser.cpp @@ -0,0 +1,79 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <limits> +#include <random> +#include <thread> + +#include <fmt/format.h> +#include <gflags/gflags.h> + +#include "communication/bolt/client.hpp" +#include "io/network/endpoint.hpp" +#include "mgclient.hpp" +#include "utils/timer.hpp" + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_int32(port, 7687, "Server port"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); +DEFINE_int32(worker_count, 1, "The number of concurrent workers executing queries against the server."); +DEFINE_int32(per_worker_query_count, 100, "The number of queries each worker will try to execute."); + +auto make_client() { + mg::Client::Params params; + params.host = FLAGS_address; + params.port = static_cast<uint16_t>(FLAGS_port); + params.use_ssl = FLAGS_use_ssl; + return mg::Client::Connect(params); +} + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + mg::Client::Init(); + + spdlog::info("Cleaning the database instance..."); + auto client = make_client(); + client->Execute("MATCH (n) DETACH DELETE n"); + client->DiscardAll(); + + spdlog::info(fmt::format("Starting parser stress test with {} workers and {} queries per worker...", + FLAGS_worker_count, FLAGS_per_worker_query_count)); + std::vector<std::thread> threads; + memgraph::utils::Timer timer; + for (int i = 0; i < FLAGS_worker_count; ++i) { + threads.push_back(std::thread([]() { + auto client = make_client(); + std::mt19937 generator{std::random_device{}()}; + std::uniform_int_distribution<uint64_t> distribution{std::numeric_limits<uint64_t>::min(), + std::numeric_limits<uint64_t>::max()}; + for (int i = 0; i < FLAGS_per_worker_query_count; ++i) { + try { + auto is_executed = client->Execute(fmt::format("MATCH (n:Label{}) RETURN n;", distribution(generator))); + if (!is_executed) { + LOG_FATAL("One of the parser stress test queries failed."); + } + client->FetchAll(); + } catch (const std::exception &e) { + LOG_FATAL("One of the parser stress test queries failed."); + } + } + })); + } + + std::ranges::for_each(threads, [](auto &t) { t.join(); }); + spdlog::info( + fmt::format("All queries executed in {:.4f}s. The parser managed to handle the load.", timer.Elapsed().count())); + mg::Client::Finalize(); + + return 0; +}