Added continuous_integration script to stress tests.

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D620
This commit is contained in:
Matej Ferencevic 2017-08-02 10:48:33 +02:00
parent 374fda2a9d
commit 2ba3df942b
7 changed files with 196 additions and 28 deletions

37
config/stress.conf Normal file
View File

@ -0,0 +1,37 @@
# MEMGRAPH DEFAULT TESTING CONFIG
# NOTE: all paths are relative to the run folder
# (where the executable is runned)
# directory to the codes which will be compiled
--compile-directory=compiled
# path to the template (cpp) for codes generation
--template-cpp-path=template/plan_template_cpp
# directory to the folder with snapshots
--snapshot-directory=snapshots
# cleaning cycle interval
# if set to -1 the GC will not run
--gc-cycle-sec=30
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--snapshot-cycle-sec=600
# create snapshot enabled on db exit
--snapshot-on-db-exit=true
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited
--max-retained-snapshots=1
# by default query engine runs in interpret mode
--interpret=true
# database recovering is disabled by default
--recover-on-startup=false
# use ast caching
--ast-cache=true

5
init
View File

@ -91,4 +91,9 @@ cd tests/macro_benchmark
./init
cd ../..
# setup stress dependencies
cd tests/stress
./init
cd ../..
echo "Done installing dependencies for Memgraph"

View File

@ -20,12 +20,12 @@ def parse_args():
:return: parsed arguments
'''
parser = connection_argument_parser()
parser.add_argument('--no-workers', type=int,
parser.add_argument('--thread-count', type=int,
default=multiprocessing.cpu_count(),
help='Number of concurrent workers.')
parser.add_argument('--no-u', type=int, default=100,
parser.add_argument('--u-count', type=int, default=100,
help='Size of U set in the bipartite graph.')
parser.add_argument('--no-v', type=int, default=100,
parser.add_argument('--v-count', type=int, default=100,
help='Size of V set in the bipartite graph.')
parser.add_argument('--vertex-batch-size', type=int, default=100,
help="Create vertices in batches of this size.")
@ -58,7 +58,7 @@ def create_u_v_edges(u):
if args.edge_batching:
# TODO: try to randomize execution, the execution time should
# be smaller, add randomize flag
for v_id_batch in batch(range(args.no_v), args.edge_batch_size):
for v_id_batch in batch(range(args.v_count), args.edge_batch_size):
match_v = render(" MATCH (v{0}:V {{id: {0}}})", v_id_batch)
create_u = render(" CREATE (u)-[:R]->(v{0})", v_id_batch)
query = match_u + "".join(match_v) + "".join(create_u)
@ -79,7 +79,7 @@ def traverse_from_u_worker(u):
with argument_session(args) as session:
start_time = time.time()
assert_equal(
args.no_u * args.no_v - args.no_v, # cypher morphism
args.u_count * args.v_count - args.v_count, # cypher morphism
session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) "
"RETURN count(v) AS cnt" % u).data()[0]['cnt'],
"Number of traversed edges started "
@ -97,7 +97,7 @@ def traverse_from_v_worker(v):
with argument_session(args) as session:
start_time = time.time()
assert_equal(
args.no_u * args.no_v - args.no_u, # cypher morphism
args.u_count * args.v_count - args.u_count, # cypher morphism
session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) "
"RETURN count(u) AS cnt" % v).data()[0]['cnt'],
"Number of traversed edges started "
@ -122,11 +122,11 @@ def execution_handler():
cleanup_end_time - start_time)
# create U vertices
for b in batch(render('CREATE (:U {{id: {}}})', range(args.no_u)),
for b in batch(render('CREATE (:U {{id: {}}})', range(args.u_count)),
args.vertex_batch_size):
session.run(" ".join(b)).consume()
# create V vertices
for b in batch(render('CREATE (:V {{id: {}}})', range(args.no_v)),
for b in batch(render('CREATE (:V {{id: {}}})', range(args.v_count)),
args.vertex_batch_size):
session.run(" ".join(b)).consume()
vertices_create_end_time = time.time()
@ -135,10 +135,10 @@ def execution_handler():
vertices_create_end_time - cleanup_end_time)
# concurrent create execution & tests
with multiprocessing.Pool(args.no_workers) as p:
with multiprocessing.Pool(args.thread_count) as p:
create_edges_start_time = time.time()
for worker_id, create_time, time_unit, no_failures in \
p.map(create_u_v_edges, [i for i in range(args.no_u)]):
p.map(create_u_v_edges, [i for i in range(args.u_count)]):
log.info('Worker ID: %s; Create time: %s%s Failures: %s' %
(worker_id, create_time, time_unit, no_failures))
create_edges_end_time = time.time()
@ -148,7 +148,7 @@ def execution_handler():
# check total number of edges
assert_equal(
args.no_v * args.no_u,
args.v_count * args.u_count,
session.run(
'MATCH ()-[r]->() '
'RETURN count(r) AS cnt').data()[0]['cnt'],
@ -158,7 +158,7 @@ def execution_handler():
traverse_from_u_start_time = time.time()
for u, traverse_u_time, time_unit in \
p.map(traverse_from_u_worker,
[i for i in range(args.no_u)]):
[i for i in range(args.u_count)]):
log.info("U {id: %s} %s%s" % (u, traverse_u_time, time_unit))
traverse_from_u_end_time = time.time()
output_data.add_measurement(
@ -169,7 +169,7 @@ def execution_handler():
traverse_from_v_start_time = time.time()
for v, traverse_v_time, time_unit in \
p.map(traverse_from_v_worker,
[i for i in range(args.no_v)]):
[i for i in range(args.v_count)]):
log.info("V {id: %s} %s%s" % (v, traverse_v_time, time_unit))
traverse_from_v_end_time = time.time()
output_data.add_measurement(
@ -178,13 +178,13 @@ def execution_handler():
# check total number of vertices
assert_equal(
args.no_v + args.no_u,
args.v_count + args.u_count,
session.run('MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'],
"Total number of vertices isn't correct! Expected: %s Actual: %s")
# check total number of edges
assert_equal(
args.no_v * args.no_u,
args.v_count * args.u_count,
session.run(
'MATCH ()-[r]->() RETURN count(r) AS cnt').data()[0]['cnt'],
"Total number of edges isn't correct! Expected: %s Actual: %s")
@ -197,8 +197,8 @@ def execution_handler():
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
output_data.add_status("stress_test_name", "bipartite")
output_data.add_status("number_of_vertices", args.no_u + args.no_v)
output_data.add_status("number_of_edges", args.no_u * args.no_v)
output_data.add_status("number_of_vertices", args.u_count + args.v_count)
output_data.add_status("number_of_edges", args.u_count * args.v_count)
output_data.add_status("vertex_batch_size", args.vertex_batch_size)
output_data.add_status("edge_batching", args.edge_batching)
output_data.add_status("edge_batch_size", args.edge_batch_size)

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import multiprocessing
import os
import subprocess
import sys
# dataset calibrated for running on Apollo (approx. 1 min per test)
SMALL_DATASET = [
{
"test": "bipartite.py",
"options": ["--u-count", "100", "--v-count", "100"],
},
{
"test": "create_match.py",
"options": ["--vertex-count", "40000", "--create-pack-size", "100"],
},
{
"test": "long_running.py",
"options": ["--vertex-count", "1000", "--edge-count", "1000", "--max-time", "2"],
},
]
# dataset calibrated for running on daily stress instance
# bipartite and create_match run for approx. 15min
# long_running runs for approx. 8h
LARGE_DATASET = [
{
"test": "bipartite.py",
"options": ["--u-count", "300", "--v-count", "300"],
},
{
"test": "create_match.py",
"options": ["--vertex-count", "500000", "--create-pack-size", "500"],
},
{
"test": "long_running.py",
"options": ["--vertex-count", "100000", "--edge-count", "100000", "--max-time", "480"],
},
]
# paths
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
CONFIG_DIR = os.path.join(BASE_DIR, "config")
def run_test(args, test, options):
print("Running test '{}'".format(test))
# get number of threads
if "THREADS" in os.environ:
threads = os.environ["THREADS"]
else:
threads = multiprocessing.cpu_count()
# start memgraph
cwd = os.path.dirname(args.memgraph)
cmd = [args.memgraph, "--num-workers=" + str(threads)]
stdout = open("/dev/null", "w") if not args.verbose else None
proc_mg = subprocess.Popen(cmd, cwd = cwd, stdout = stdout,
env = {"MEMGRAPH_CONFIG": args.config})
# start test
cmd = [args.python, os.path.join(SCRIPT_DIR, test), "--thread-count",
str(threads)] + options
stderr = open("/dev/null", "w") if not args.verbose else None
ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, stderr = stderr)
# stop memgraph
proc_mg.terminate()
ret_mg = proc_mg.wait()
if ret_mg != 0:
raise Exception("Memgraph binary returned non-zero ({})!".format(
ret_mg))
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(
test, ret_test.returncode))
# parse arguments
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--config", default = os.path.join(CONFIG_DIR,
"stress.conf"))
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type = str)
parser.add_argument("--large-dataset", action = "store_const",
const = True, default = False)
parser.add_argument("--verbose", action = "store_const",
const = True, default = False)
args = parser.parse_args()
# run tests
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
for test in dataset:
run_test(args, **test)
print("Done!")

View File

@ -25,10 +25,10 @@ def parse_args():
parser = connection_argument_parser()
# specific
parser.add_argument('--no-workers', type=int,
parser.add_argument('--thread-count', type=int,
default=multiprocessing.cpu_count(),
help='Number of concurrent workers.')
parser.add_argument('--no-vertices', type=int, default=100,
parser.add_argument('--vertex-count', type=int, default=100,
help='Number of created vertices.')
parser.add_argument('--max-property-value', type=int, default=1000,
help='Maximum value of property - 1. A created node '
@ -51,30 +51,30 @@ def create_worker(worker_id):
:return: tuple (worker_id, create execution time, time unit)
'''
assert args.no_vertices > 0, 'Number of vertices has to be positive int'
assert args.vertex_count > 0, 'Number of vertices has to be positive int'
generated_xs = defaultdict(int)
create_query = ''
with argument_session(args) as session:
# create vertices
start_time = time.time()
for i in range(0, args.no_vertices):
for i in range(0, args.vertex_count):
random_number = random.randint(0, args.max_property_value - 1)
generated_xs[random_number] += 1
create_query += 'CREATE (:Label_T%s {x: %s}) ' % \
(worker_id, random_number)
# if full back or last item -> execute query
if (i + 1) % args.create_pack_size == 0 or \
i == args.no_vertices - 1:
i == args.vertex_count - 1:
session.run(create_query).consume()
create_query = ''
create_time = time.time()
# check total count
result_set = session.run('MATCH (n:Label_T%s) RETURN count(n) AS cnt' %
worker_id).data()[0]
assert result_set['cnt'] == args.no_vertices, \
assert result_set['cnt'] == args.vertex_count, \
'Create vertices Expected: %s Created: %s' % \
(args.no_vertices, result_set['cnt'])
(args.vertex_count, result_set['cnt'])
# check count per property value
for i, size in generated_xs.items():
result_set = session.run('MATCH (n:Label_T%s {x: %s}) '
@ -94,14 +94,14 @@ def create_handler():
session.run("MATCH (n) DETACH DELETE n").consume()
# concurrent create execution & tests
with multiprocessing.Pool(args.no_workers) as p:
with multiprocessing.Pool(args.thread_count) as p:
for worker_id, create_time, time_unit in \
p.map(create_worker, [i for i in range(args.no_workers)]):
p.map(create_worker, [i for i in range(args.thread_count)]):
log.info('Worker ID: %s; Create time: %s%s' %
(worker_id, create_time, time_unit))
# check total count
expected_total_count = args.no_workers * args.no_vertices
expected_total_count = args.thread_count * args.vertex_count
total_count = session.run(
'MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt']
assert total_count == expected_total_count, \

11
tests/stress/init Executable file
View File

@ -0,0 +1,11 @@
#!/bin/bash
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$DIR"
# setup virtual environment
if [ ! -d "ve3" ]; then
virtualenv -p python3 ve3
source ve3/bin/activate
pip3 install -r requirements.txt
fi

View File

@ -178,7 +178,7 @@ RUNS.append(generate_run("quality_assurance", commands = commands,
"\.quality_assurance_status".format(
BASE_DIR_NAME)))
# macro benchmark tests
# build release paths
if mode == "release":
BUILD_RELEASE_DIR = os.path.join(BASE_DIR, "build")
else:
@ -186,6 +186,9 @@ else:
binary_release_name = run_cmd(["find", ".", "-maxdepth", "1", "-executable",
"-type", "f", "-name", "memgraph*"], BUILD_RELEASE_DIR).split("\n")[0][2:]
binary_release_path = os.path.join(BUILD_RELEASE_DIR, binary_release_name)
binary_release_link_path = os.path.join(BUILD_RELEASE_DIR, "memgraph")
# macro benchmark tests
macro_bench_path = os.path.join(BASE_DIR, "tests", "macro_benchmark")
stress_common = os.path.join(BASE_DIR, "tests", "stress", "common.py")
infile = create_archive("macro_benchmark", [binary_release_path,
@ -198,6 +201,15 @@ outfile_paths = "\./{}/tests/macro_benchmark/harness/\.harness_summary".format(
RUNS.append(generate_run("macro_benchmark", supervisor = supervisor,
arguments = args, infile = infile, outfile_paths = outfile_paths))
# stress tests
stress_path = os.path.join(BASE_DIR, "tests", "stress")
infile = create_archive("stress", [binary_release_path,
binary_release_link_path, stress_path, config_path],
cwd = WORKSPACE_DIR)
cmd = "cd {}/tests/stress\nTIMEOUT=600 ./continuous_integration " \
"--memgraph {}".format(BASE_DIR_NAME, binary_release_link_path)
RUNS.append(generate_run("stress", commands = cmd, infile = infile))
# store ARCHIVES and RUNS
store_metadata(OUTPUT_DIR, "archives", ARCHIVES)
store_metadata(OUTPUT_DIR, "runs", RUNS)