Migrate harness to use our bolt client
Reviewers: mislav.bradac Reviewed By: mislav.bradac Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D711
This commit is contained in:
parent
19bec4acc8
commit
0914c5a941
@ -16,10 +16,18 @@
|
||||
# if set to -1 the GC will not run
|
||||
--gc-cycle-sec=-1
|
||||
|
||||
# skiplist gc cycle interval
|
||||
# if set to 0 the GC will not run
|
||||
--skiplist_gc_interval=0
|
||||
|
||||
# snapshot cycle interval
|
||||
# if set to -1 the snapshooter will not run
|
||||
--snapshot-cycle-sec=-1
|
||||
|
||||
# snapshot cycle interval
|
||||
# if set to -1 the snapshooter will not run
|
||||
--query_execution_time_sec=-1
|
||||
|
||||
# create snapshot disabled on db exit
|
||||
--snapshot-on-db-exit=false
|
||||
|
@ -27,6 +27,9 @@ endif (UNIX)
|
||||
# benchmark test binaries
|
||||
add_subdirectory(${PROJECT_SOURCE_DIR}/benchmark)
|
||||
|
||||
# macro_benchmark test binaries
|
||||
add_subdirectory(${PROJECT_SOURCE_DIR}/macro_benchmark)
|
||||
|
||||
# concurrent test binaries
|
||||
add_subdirectory(${PROJECT_SOURCE_DIR}/concurrent)
|
||||
|
||||
|
31
tests/macro_benchmark/CMakeLists.txt
Normal file
31
tests/macro_benchmark/CMakeLists.txt
Normal file
@ -0,0 +1,31 @@
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
# set current directory name as a test type
|
||||
get_filename_component(test_type ${CMAKE_CURRENT_SOURCE_DIR} NAME)
|
||||
|
||||
# get all cpp abs file names recursively starting from current directory
|
||||
file(GLOB_RECURSE test_type_cpps *.cpp)
|
||||
message(STATUS "Available ${test_type} cpp files are: ${test_type_cpps}")
|
||||
|
||||
# for each cpp file build binary and register test
|
||||
foreach(test_cpp ${test_type_cpps})
|
||||
|
||||
# get exec name (remove extension from the abs path)
|
||||
get_filename_component(exec_name ${test_cpp} NAME_WE)
|
||||
|
||||
# set target name in format {project_name}__{test_type}__{exec_name}
|
||||
set(target_name ${project_name}__${test_type}__${exec_name})
|
||||
|
||||
# build exec file
|
||||
add_executable(${target_name} ${test_cpp})
|
||||
set_property(TARGET ${target_name} PROPERTY CXX_STANDARD ${cxx_standard})
|
||||
|
||||
# OUTPUT_NAME sets the real name of a target when it is built and can be
|
||||
# used to help create two targets of the same name even though CMake
|
||||
# requires unique logical target names
|
||||
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
|
||||
|
||||
# link libraries
|
||||
target_link_libraries(${target_name} memgraph_lib)
|
||||
|
||||
endforeach()
|
@ -1,104 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
A python script that launches the memgraph client,
|
||||
executes a query and prints out a JSON dict of measurements
|
||||
to stdout.
|
||||
|
||||
Takes a number of cmd-line arguments of the following structure:
|
||||
Positional, mandatory:
|
||||
- db_uri
|
||||
- query
|
||||
Named, optional:
|
||||
- encrypt
|
||||
|
||||
Required the database URI to be passed as the single
|
||||
cmd line argument.
|
||||
|
||||
The dict that is printed out contains:
|
||||
- return_code of the client execution process
|
||||
- error_msg (empty if not applicable)
|
||||
- metedata dict
|
||||
|
||||
Note that 'metadata' are only valid if the return_code is 0
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import io
|
||||
from contextlib import redirect_stderr
|
||||
from multiprocessing import Pool
|
||||
from functools import partial
|
||||
|
||||
# tests/stress dir, that's the place of common.py.
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
|
||||
os.path.realpath(__file__)))), "stress"))
|
||||
from common import connection_argument_parser, execute_till_success, \
|
||||
argument_driver
|
||||
|
||||
|
||||
# string constants
|
||||
RETURN_CODE = "return_code"
|
||||
ERROR_MSG = "error_msg"
|
||||
WALL_TIME = "wall_time"
|
||||
|
||||
|
||||
def _prepare_for_json(obj):
|
||||
if isinstance(obj, dict):
|
||||
return {k: _prepare_for_json(v) for k, v in obj.items()}
|
||||
if isinstance(obj, list):
|
||||
return [_prepare_for_json(elem) for elem in obj]
|
||||
if isinstance(obj, (str, int, float, type(None))):
|
||||
return obj
|
||||
return None
|
||||
|
||||
|
||||
def _print_dict(fname, d):
|
||||
with open(fname, "a") as f:
|
||||
f.write(json.dumps(_prepare_for_json(d), indent=2))
|
||||
|
||||
|
||||
def _run_query(args, query, self):
|
||||
if not hasattr(self, "driver"):
|
||||
# TODO: this driver and session is never closed.
|
||||
self.driver = argument_driver(args)
|
||||
self.session = self.driver.session()
|
||||
return execute_till_success(self.session, query)[2]
|
||||
_run_query.__defaults__ = (_run_query,)
|
||||
|
||||
|
||||
def main():
|
||||
argp = connection_argument_parser()
|
||||
argp.add_argument("--num-workers", type=int, default=1)
|
||||
argp.add_argument("--output", type=str)
|
||||
|
||||
# Parse args and ensure that stdout is not polluted by argument parsing.
|
||||
try:
|
||||
f = io.StringIO()
|
||||
with redirect_stderr(f):
|
||||
args = argp.parse_args()
|
||||
except:
|
||||
_print_dict(args.output, {RETURN_CODE: 1, ERROR_MSG: "Invalid cmd-line arguments"})
|
||||
sys.exit(1)
|
||||
|
||||
queries = filter(lambda x: x.strip() != '', sys.stdin.read().split("\n"))
|
||||
|
||||
# Execute the queries.
|
||||
metadatas = []
|
||||
with Pool(args.num_workers) as pool:
|
||||
start = time.time()
|
||||
metadatas = list(pool.map(partial(_run_query, args), queries))
|
||||
end = time.time()
|
||||
delta_time = end - start
|
||||
|
||||
_print_dict(args.output, {
|
||||
RETURN_CODE: 0,
|
||||
WALL_TIME: (None if not queries else delta_time),
|
||||
"metadatas": metadatas
|
||||
})
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -319,10 +319,9 @@ class _BaseRunner:
|
||||
|
||||
def _get_argparser(self):
|
||||
argp = ArgumentParser("RunnerArgumentParser")
|
||||
# TODO: These two options should be passed two database and client, not
|
||||
# only client as we are doing at the moment.
|
||||
# TODO: This option should be passed to the database and client, not
|
||||
# only to the client as we are doing at the moment.
|
||||
argp.add_argument("--RunnerUri", default="127.0.0.1:7687")
|
||||
argp.add_argument("--RunnerEncryptBolt", action="store_true")
|
||||
return argp
|
||||
|
||||
def start(self):
|
||||
@ -331,14 +330,17 @@ class _BaseRunner:
|
||||
|
||||
def execute(self, queries, num_client_workers):
|
||||
self.log.debug("execute('%s')", str(queries))
|
||||
client = path.join(DIR_PATH, "run_bolt_client")
|
||||
client_args = ["--endpoint", self.args.RunnerUri]
|
||||
client_args += ["--num-workers", str(num_client_workers)]
|
||||
output_fd, output = tempfile.mkstemp()
|
||||
os.close(output_fd)
|
||||
client_args += ["--output", output]
|
||||
if self.args.RunnerEncryptBolt:
|
||||
client_args.append("--ssl-enabled")
|
||||
|
||||
client = os.path.normpath(os.path.join(DIR_PATH,
|
||||
"../../../build/tests/macro_benchmark/harness_client"))
|
||||
if not os.path.exists(client):
|
||||
# Apollo builds both debug and release binaries on diff
|
||||
# so we need to use the release client if the debug one
|
||||
# doesn't exist
|
||||
client = os.path.normpath(os.path.join(DIR_PATH,
|
||||
"../../../build_release/tests/macro_benchmark/"
|
||||
"harness_client"))
|
||||
|
||||
queries_fd, queries_path = tempfile.mkstemp()
|
||||
try:
|
||||
queries_file = os.fdopen(queries_fd, "w")
|
||||
@ -349,6 +351,14 @@ class _BaseRunner:
|
||||
os.remove(queries_path)
|
||||
raise Exception("Writing queries to temporary file failed")
|
||||
|
||||
output_fd, output = tempfile.mkstemp()
|
||||
os.close(output_fd)
|
||||
|
||||
address, port = self.args.RunnerUri.split(":")
|
||||
client_args = ["--address", address, "--port", port,
|
||||
"--num-workers", str(num_client_workers),
|
||||
"--output", output]
|
||||
|
||||
cpu_time_start = self.database_bin.get_usage()["cpu"]
|
||||
# TODO make the timeout configurable per query or something
|
||||
return_code = self.bolt_client.run_and_wait(
|
||||
@ -362,10 +372,14 @@ class _BaseRunner:
|
||||
"Failed with return_code %d and stderr:\n%s",
|
||||
str(queries), return_code, stderr)
|
||||
raise Exception("BoltClient execution failed")
|
||||
|
||||
with open(output) as f:
|
||||
data = json.loads(f.read())
|
||||
data[CPU_TIME] = cpu_time_end - cpu_time_start
|
||||
return data
|
||||
data[CPU_TIME] = cpu_time_end - cpu_time_start
|
||||
|
||||
os.remove(output)
|
||||
|
||||
return data
|
||||
|
||||
def stop(self):
|
||||
self.log.info("stop")
|
||||
@ -391,7 +405,7 @@ class MemgraphRunner(_BaseRunner):
|
||||
argp.add_argument("--RunnerConfig",
|
||||
default=os.path.normpath(os.path.join(
|
||||
DIR_PATH,
|
||||
"../../../config/benchmarking.conf")))
|
||||
"../../../config/benchmarking_latency.conf")))
|
||||
# parse args
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
|
@ -18,12 +18,12 @@ using DecodedValueT = communication::bolt::DecodedValue;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_string(port, "7687", "Server port");
|
||||
DEFINE_uint64(num_workers, 1, "Number of workers");
|
||||
DEFINE_int32(num_workers, 1, "Number of workers");
|
||||
DEFINE_string(output, "", "Output file");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
|
||||
const uint64_t MAX_RETRIES = 1000;
|
||||
const int MAX_RETRIES = 1000;
|
||||
|
||||
void PrintJsonDecodedValue(std::ostream &os, const DecodedValueT &value) {
|
||||
switch (value.type()) {
|
||||
@ -91,7 +91,7 @@ int main(int argc, char **argv) {
|
||||
std::string query;
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
SpinLock mutex;
|
||||
SpinLock spinlock;
|
||||
uint64_t last = 0;
|
||||
std::vector<std::string> queries;
|
||||
std::vector<std::map<std::string, DecodedValueT>> metadata;
|
||||
@ -119,21 +119,21 @@ int main(int argc, char **argv) {
|
||||
|
||||
ClientT client(std::move(socket), FLAGS_username, FLAGS_password);
|
||||
|
||||
uint64_t pos, i;
|
||||
std::string str;
|
||||
while (true) {
|
||||
uint64_t pos;
|
||||
{
|
||||
std::lock_guard<SpinLock> lock(mutex);
|
||||
std::lock_guard<SpinLock> lock(spinlock);
|
||||
if (last == queries.size()) {
|
||||
break;
|
||||
}
|
||||
pos = last++;
|
||||
str = queries[pos];
|
||||
}
|
||||
int i;
|
||||
for (i = 0; i < MAX_RETRIES; ++i) {
|
||||
try {
|
||||
auto ret = client.Execute(str, {});
|
||||
std::lock_guard<SpinLock> lock(mutex);
|
||||
metadata[pos] = ret.metadata;
|
||||
break;
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
@ -1,7 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
cd $DIR
|
||||
source ../ve3/bin/activate
|
||||
python3 bolt_client.py $@
|
||||
exit $?
|
@ -21,7 +21,7 @@ mkdir build_release
|
||||
|
||||
cd build_release
|
||||
cmake -DCMAKE_BUILD_TYPE=release ..
|
||||
TIMEOUT=1000 make -j$THREADS memgraph_link_target
|
||||
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark__harness_client
|
||||
|
||||
cd ../../parent
|
||||
|
||||
|
@ -192,9 +192,10 @@ binary_release_link_path = os.path.join(BUILD_RELEASE_DIR, "memgraph")
|
||||
# macro benchmark tests
|
||||
MACRO_BENCHMARK_ARGS = "QuerySuite MemgraphRunner --groups aggregation"
|
||||
macro_bench_path = os.path.join(BASE_DIR, "tests", "macro_benchmark")
|
||||
stress_common = os.path.join(BASE_DIR, "tests", "stress", "common.py")
|
||||
harness_client_binary = os.path.join(BUILD_RELEASE_DIR, "tests",
|
||||
"macro_benchmark", "harness_client")
|
||||
infile = create_archive("macro_benchmark", [binary_release_path,
|
||||
macro_bench_path, stress_common, config_path],
|
||||
macro_bench_path, config_path, harness_client_binary],
|
||||
cwd = WORKSPACE_DIR)
|
||||
supervisor = "./{}/tests/macro_benchmark/harness/harness.py".format(BASE_DIR_NAME)
|
||||
args = MACRO_BENCHMARK_ARGS + " --RunnerBin " + binary_release_path
|
||||
|
Loading…
Reference in New Issue
Block a user