Migrate harness to memgraph

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D532
This commit is contained in:
Mislav Bradac 2017-07-10 18:17:18 +02:00
parent 99fffc23f7
commit d14accad07
61 changed files with 1071 additions and 0 deletions

1
.gitignore vendored
View File

@ -13,6 +13,7 @@
.idea
.ycm_extra_conf.pyc
Testing/
build
build/
cmake-build-*
cmake/DownloadProject/

View File

@ -358,6 +358,8 @@ string(STRIP ${COMMIT_NO} COMMIT_NO)
string(STRIP ${COMMIT_HASH} COMMIT_HASH)
set(MEMGRAPH_BUILD_NAME
"memgraph_${COMMIT_NO}_${COMMIT_HASH}_${COMMIT_BRANCH}_${CMAKE_BUILD_TYPE}")
add_custom_target(memgraph_link_target ALL
COMMAND ${CMAKE_COMMAND} -E create_symlink ${CMAKE_BINARY_DIR}/${MEMGRAPH_BUILD_NAME} ${CMAKE_BINARY_DIR}/memgraph)
# -----------------------------------------------------------------------------
# memgraph main executable

View File

@ -0,0 +1,3 @@
{
"iterations": 20
}

View File

@ -0,0 +1 @@
MATCH (n) RETURN count(n), count(n.x)

View File

@ -0,0 +1 @@
MATCH (n) RETURN min(n.x), max(n.x), avg(n.x)

View File

@ -0,0 +1,8 @@
BATCH_SIZE = 50
VERTEX_COUNT = 500
for i in range(VERTEX_COUNT):
print("CREATE (n%d {x: %d})" % (i, i))
# batch CREATEs because we can't execute all at once
if i != 0 and i % BATCH_SIZE == 0:
print(";")

View File

@ -0,0 +1,3 @@
{
"iterations": 20
}

View File

@ -0,0 +1 @@
MATCH (a), (b) CREATE (a)-[:Type]->(b)

View File

@ -0,0 +1 @@
CREATE ()

View File

@ -0,0 +1 @@
CREATE ()-[:Type]->()-[:Type]->()-[:Type]->()-[:Type]->()-[:Type]->()-[:Type]->()-[:Type]->()

View File

@ -0,0 +1 @@
CREATE ()-[:Type]->()

View File

@ -0,0 +1 @@
MATCH (n) DETACH DELETE n

View File

@ -0,0 +1 @@
CREATE ()

View File

@ -0,0 +1 @@
CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, p3: "Here is some text that is not extremely short", p4:"Short text", p5: 234.434, p6: 11.11, p7: false})

View File

@ -0,0 +1,3 @@
{
"iterations": 20
}

View File

@ -0,0 +1,3 @@
from setup import create_edges
create_edges(400, 200)

View File

@ -0,0 +1 @@
MATCH ()-[e]->() DELETE e

View File

@ -0,0 +1,3 @@
from setup import create_vertices
create_vertices(200)

View File

@ -0,0 +1,26 @@
""" This file does nothing, it's just utilities for other setups """
from random import randint
BATCH_SIZE = 50
def create_vertices(vertex_count):
for vertex in range(vertex_count):
print("CREATE (:Label {id: %d})" % vertex)
if vertex != 0 and vertex % BATCH_SIZE == 0:
print(";")
def create_edges(edge_count, vertex_count):
""" vertex_count is the number of already existing vertices in graph """
for edge in range(edge_count):
print("MERGE ({id: %d})-[:Type]->({id: %d})" % (
randint(0, vertex_count - 1), randint(0, vertex_count - 1)))
if edge != 0 and edge % BATCH_SIZE == 0:
print(";")
if __name__ == '__main__':
raise Exception("This file is just for utilities, not for actual setup")

View File

@ -0,0 +1,3 @@
from setup import create_vertices
create_vertices(200)

View File

@ -0,0 +1 @@
MATCH (n) DELETE n

View File

@ -0,0 +1,4 @@
from setup import create_vertices, create_edges
create_vertices(200)
create_edges(400, 200)

View File

@ -0,0 +1 @@
MATCH (n) DETACH DELETE n

View File

@ -0,0 +1,4 @@
RETURN
1 + 3, 2 - 1, 2 * 5, 5 / 2, 5 % 5, -5,
1.4 + 3.3, 6.2 - 5.4, 6.5 * 1.2, 6.6 / 1.2, 8.7 % 3.2, -6.6,
"Flo" + "Lasta"

View File

@ -0,0 +1 @@
RETURN 1 < 2, 2 = 3, 6.66 < 10.2, 3.14 = 3.2, "Ana" < "Ivana", "Ana" = "Mmmmm", Null < Null, Null = Null

View File

@ -0,0 +1,3 @@
{
"iterations": 200
}

View File

@ -0,0 +1 @@
RETURN true AND false, true OR false, true XOR false, NOT true

View File

@ -0,0 +1,3 @@
{
"iterations": 10
}

View File

@ -0,0 +1 @@
print("MATCH (n)-[r1]->(m)-[r2]->(n) RETURN *")

View File

@ -0,0 +1 @@
print("MATCH (n1)-[r1]->(n2)<-[r2]-(n3)-[r3]->(n4) RETURN *")

View File

@ -0,0 +1 @@
print("MATCH (n)-[r]->(m) RETURN *")

View File

@ -0,0 +1,3 @@
from setup import ID, rint, VERTEX_COUNT
print("MATCH (n)-[r]->(m) WHERE n.%s = %d AND m.%s = %d RETURN *" % (
ID, rint(VERTEX_COUNT), ID, rint(VERTEX_COUNT)))

View File

@ -0,0 +1,71 @@
"""
Generates a random graph with some configurable statistics.
"""
from random import randint
def rint(upper_bound_exclusive):
return randint(0, upper_bound_exclusive - 1)
VERTEX_COUNT = 100
EDGE_COUNT = VERTEX_COUNT * 3
# numbers of *different* labels, edge types and properties
LABEL_COUNT = 10
EDGE_TYPE_COUNT = 10
MAX_LABELS = 3 # maximum number of labels in a vertex
MAX_PROPS = 4 # maximum number of properties in a vertex/edge
# some consts used in mutiple files
LABEL_PREFIX = "Label"
PROP_PREFIX = "Property"
ID = "id"
def labels():
return "".join(":%s%d" % (LABEL_PREFIX, rint(LABEL_COUNT))
for _ in range(randint(1, MAX_LABELS - 1)))
def properties(id):
""" Generates a properties string with [0, MAX_PROPS) properties.
Note that if PropX is generated, then all the PropY where Y < X
are generated. Thus most labels have Prop0, and least have PropMAX_PROPS.
"""
return "{%s: %d, %s}" % (ID, id, ",".join(
["%s%d: %d" % (PROP_PREFIX, prop_ind, rint(100))
for prop_ind in range(randint(1, MAX_PROPS - 1))]))
def vertex(vertex_index):
return "(%s %s)" % (labels(), properties(vertex_index))
def edge(edge_index):
return "[:EdgeType%d %s]" % (rint(EDGE_TYPE_COUNT), properties(edge_index))
def main():
# we batch CREATEs because to speed creation up
BATCH_SIZE = 50
# create vertices
for vertex_index in range(VERTEX_COUNT):
print("CREATE %s" % vertex(vertex_index))
if vertex_index != 0 and vertex_index % BATCH_SIZE == 0:
print(";")
# create edges
for edge_index in range(EDGE_COUNT):
print("MERGE (a%d {%s: %d})-%s->(b%d {%s: %d})" % (
edge_index, ID, randint(0, VERTEX_COUNT - 1),
edge(edge_index),
edge_index, ID, randint(0, VERTEX_COUNT - 1)))
if edge_index != 0 and edge_index % BATCH_SIZE == 0:
print(";")
if __name__ == "__main__":
main()

View File

@ -0,0 +1 @@
MATCH (n) DETACH DELETE n

View File

@ -0,0 +1,2 @@
from setup import LABEL_COUNT, rint
print("MATCH (n:Label%d) RETURN n" % rint(LABEL_COUNT))

View File

@ -0,0 +1,3 @@
from setup import LABEL_PREFIX, PROP_PREFIX, MAX_PROPS, LABEL_COUNT, rint
print("MATCH (n:%s%d {%s%d: %d}) RETURN n" % (
LABEL_PREFIX, rint(LABEL_COUNT), PROP_PREFIX, rint(MAX_PROPS), rint(10)))

View File

@ -0,0 +1,3 @@
from setup import PROP_PREFIX, MAX_PROPS, rint
print("MATCH (n {%s%d: %d}) RETURN n" % (
PROP_PREFIX, rint(MAX_PROPS), rint(10)))

View File

@ -0,0 +1,6 @@
from setup import VERTEX_COUNT
SKIP = VERTEX_COUNT // 4
LIMIT = VERTEX_COUNT // 4
print("MATCH (n) RETURN n ORDER BY n.id SKIP %d LIMIT %d" % (SKIP, LIMIT))

View File

@ -0,0 +1,3 @@
{
"iterations": 20
}

View File

@ -0,0 +1 @@
MATCH (n) RETURN DISTINCT n.x

View File

@ -0,0 +1 @@
MATCH (n) RETURN n LIMIT 10

View File

@ -0,0 +1 @@
MATCH (n) RETURN n ORDER BY n.id

View File

@ -0,0 +1,14 @@
BATCH_SIZE = 50
VERTEX_COUNT = 500
UNIQUE_VALUES = 50
def main():
for i in range(VERTEX_COUNT):
print("CREATE (n%d {x: %d, id: %d})" % (i, i % UNIQUE_VALUES, i))
# batch CREATEs because we can't execute all at once
if i != 0 and i % BATCH_SIZE == 0:
print(";")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,3 @@
from setup import VERTEX_COUNT
print("MATCH (n) RETURN n SKIP %d" % (VERTEX_COUNT // 2))

View File

@ -0,0 +1 @@
MATCH (n) SET n:Label

View File

@ -0,0 +1 @@
MATCH (n) REMOVE n:Label

View File

@ -0,0 +1 @@
MATCH (n) SET n.property = 42

View File

@ -0,0 +1 @@
MATCH (n) REMOVE n.property

View File

@ -0,0 +1 @@
MATCH (n) REMOVE n:Label

View File

@ -0,0 +1 @@
MATCH (n) SET n:Label

View File

@ -0,0 +1 @@
MATCH (n) REMOVE n.property

View File

@ -0,0 +1 @@
MATCH (n) SET n.property = 42

View File

@ -0,0 +1,8 @@
BATCH_SIZE = 50
VERTEX_COUNT = 500
for i in range(VERTEX_COUNT):
print("CREATE (n%d {x: %d})" % (i, i))
# batch CREATEs because we can't execute all at once
if i != 0 and i % BATCH_SIZE == 0:
print(";")

View File

@ -0,0 +1,100 @@
#!/usr/bin/python3
"""
A python script that launches the memgraph client,
executes a query and prints out a JSON dict of measurements
to stdout.
Takes a number of cmd-line arguments of the following structure:
Positional, mandatory:
- db_uri
- query
Named, optional:
- encrypt
Required the database URI to be passed as the single
cmd line argument.
The dict that is printed out contains:
- return_code of the client execution process
- error_msg (empty if not applicable)
- metedata dict
Note that 'metadata' are only valid if the return_code is 0
"""
import sys
import time
import json
from argparse import ArgumentParser
from contextlib import redirect_stderr
import io
from neo4j.v1 import GraphDatabase, basic_auth
# string constants
RETURN_CODE = "return_code"
ERROR_MSG = "error_msg"
WALL_TIME = "wall_time"
def _prepare_for_json(obj):
if isinstance(obj, dict):
return {k: _prepare_for_json(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_prepare_for_json(elem) for elem in obj]
if isinstance(obj, (str, int, float, type(None))):
return obj
return None
def _print_dict(d):
print(json.dumps(_prepare_for_json(d), indent=2))
def main():
argp = ArgumentParser("Bolt client execution process")
# positional args
argp.add_argument("db_uri")
argp.add_argument("queries", nargs="+")
# named, optional
argp.add_argument("--encrypt", action="store_true")
# parse ags, ensure that stdout is not polluted by argument parsing
try:
f = io.StringIO()
with redirect_stderr(f):
args = argp.parse_args()
except:
_print_dict({RETURN_CODE: 1, ERROR_MSG: "Invalid cmd-line arguments"})
sys.exit(1)
driver = GraphDatabase.driver(
args.db_uri,
auth=basic_auth("", ""),
encrypted=args.encrypt)
session = driver.session()
# execute the queries
metadatas = []
start = time.time()
for query in args.queries:
result = session.run(query)
metadatas.append(result.summary().metadata)
end = time.time()
delta_time = end - start
_print_dict({
RETURN_CODE: 0,
WALL_TIME: delta_time / float(len(args.queries)),
"metadatas": metadatas
})
session.close()
driver.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,448 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import os
from os import path
import requests
import time
import itertools
import json
from subprocess import check_output
from argparse import ArgumentParser
from collections import OrderedDict
from collections import defaultdict
import jail_faker as jail
from bolt_client import WALL_TIME
from perf import Perf
log = logging.getLogger(__name__)
class QuerySuite():
"""
Executes a Query-based benchmark scenario. Query-based scenarios
consist of setup steps (Cypher queries) executed before the benchmark,
a single Cypher query that is benchmarked, and teardown steps
(Cypher queries) executed after the benchmark.
"""
# what the QuerySuite can work with
KNOWN_KEYS = {"config", "setup", "itersetup", "run", "iterteardown",
"teardown"}
summary = "Summary:\n{:>30}{:>30}{:>30}{:>30}{:>30}\n".format(
"scenario_name", "query_parsing_time", "query_planning_time",
"query_plan_execution_time", WALL_TIME)
def __init__(self, args):
self.perf = Perf()
argp = ArgumentParser(description=__doc__)
argp.add_argument("--perf", help="Run perf on memgraph binary.",
action="store_true")
args, _ = argp.parse_known_args(args)
self.perf = Perf() if args.perf else None
class Loader():
"""
Loads file contents. Supported types are:
.py - executable that prints out Cypher queries
.cypher - contains Cypher queries in textual form
.json - contains a configuration
A QueryLoader object is callable.
A call to it returns a generator that yields loaded data
(Cypher queries, configuration). In that sense one
QueryLoader is reusable. The generator approach makes it possible
to generated different queries each time when executing a .py file.
"""
def __init__(self, file_path):
self.file_path = file_path
def _queries(self, data):
""" Helper function for breaking down and filtering queries"""
for element in filter(None, map(str.strip, data.split(";"))):
yield element
def __call__(self):
""" Yields queries found in the given file_path one by one """
log.debug("Generating queries from file_path: %s",
self.file_path)
_, extension = path.splitext(self.file_path)
if extension == ".cypher":
with open(self.file_path) as f:
return self._queries(f.read())
elif extension == ".py":
return self._queries(check_output(
["python3", self.file_path]).decode("ascii"))
elif extension == ".json":
with open(self.file_path) as f:
return [json.load(f)].__iter__()
else:
raise Exception("Unsupported filetype {} ".format(extension))
def __repr__(self):
return "(QuerySuite.Loader<%s>)" % self.file_path
@staticmethod
def scenarios(args):
"""
Scans through folder structure starting with groups_root and
loads query scenarios.
Expected folder structure is:
groups_root/
groupname1/
config.json
setup.FILE_TYPE
teardown.FILE_TYPE
itersetup.FILE_TYPE
iterteardown.FILE_TYPE
scenario1.config.json
scenario1.run.FILE_TYPE-------(mandatory)
scenario1.setup.FILE_TYPE
scenario1.teardown.FILE_TYPE
scenario1.itersetup.FILE_TYPE
scenario1.iterteardown.FILE_TYPE
scenario2...
...
groupname2/
...
Per query configs (setup, teardown, itersetup, iterteardown)
override group configs for that scenario. Group configs must have one
extension (.FILE_TYPE) and
scenario configs must have 2 extensions (.scenario_name.FILE_TYPE).
See `QueryLoader` documentation to see which file types are supported.
Args:
args: additional args parsed by this function
group_paths: str, root folder that contains group folders
Return:
{group: (scenario, {config: query_generator_function})
"""
argp = ArgumentParser("QuerySuite.scenarios argument parser")
argp.add_argument("--query-scenarios-root", default=path.join(
path.dirname(path.dirname(path.realpath(__file__))), "groups"),
dest="root")
args, _ = argp.parse_known_args()
log.info("Loading query scenarios from root: %s", args.root)
def fill_config_dict(config_dict, base, config_files):
for config_file in config_files:
log.debug("Processing config file %s", config_file)
config_name = config_file.split(".")[-2]
config_dict[config_name] = QuerySuite.Loader(
path.join(base, config_file))
# validate that the scenario does not contain any illegal
# keys (defense against typos in file naming)
unknown_keys = set(config_dict) - QuerySuite.KNOWN_KEYS
if unknown_keys:
raise Exception("Unknown QuerySuite config elements: '%r'" %
unknown_keys)
def dir_content(root, predicate):
return [p for p in os.listdir(root)
if predicate(path.join(root, p))]
group_scenarios = OrderedDict()
for group in dir_content(args.root, path.isdir):
log.info("Loading group: '%s'", group)
group_scenarios[group] = []
files = dir_content(path.join(args.root, group),
path.isfile)
# process group default config
group_config = {}
fill_config_dict(group_config, path.join(args.root, group),
[f for f in files if f.count(".") == 1])
# group files on scenario
for scenario_name, scenario_files in itertools.groupby(
filter(lambda f: f.count(".") == 2, sorted(files)),
lambda x: x.split(".")[0]):
log.info("Loading scenario: '%s'", scenario_name)
scenario = dict(group_config)
fill_config_dict(scenario,
path.join(args.root, group),
scenario_files)
group_scenarios[group].append((scenario_name, scenario))
log.debug("Loaded config for scenario '%s'\n%r", scenario_name,
scenario)
return group_scenarios
def run(self, scenario, scenario_name, runner):
log.debug("QuerySuite.run() with scenario: %s", scenario)
scenario_config = scenario.get("config")
scenario_config = next(scenario_config()) if scenario_config else {}
def execute(config_name):
queries = scenario.get(config_name)
return runner.execute(queries()) if queries else None
measurements = []
measurement_sums = defaultdict(float)
def add_measurement(dictionary, iteration, key):
if key in dictionary:
measurement = {"target": key, "value": dictionary[key],
"unit": "s", "type": "time"}
measurement["iteration"] = iteration
measurements.append(measurement)
try:
measurement_sums[key] += float(dictionary[key])
except:
pass
pid = runner.start()
execute("setup")
# warmup phase
for _ in range(min(scenario_config.get("iterations", 1),
scenario_config.get("warmup", 5))):
execute("itersetup")
execute("run")
execute("iterteardown")
if self.perf:
self.perf.start(pid)
# TODO per scenario/run runner configuration
num_iterations = scenario_config.get("iterations", 1)
for iteration in range(num_iterations):
# TODO if we didn't have the itersetup it would be trivial
# to move iteration to the bolt_client script, so we would not
# have to start and stop the client for each iteration, it would
# most likely run faster
execute("itersetup")
# TODO measure CPU time (expose it from the runner)
run_result = execute('run')
assert len(run_result.get("metadatas", [])), \
"Scenario run must have exactly one query"
add_measurement(run_result, iteration, WALL_TIME)
add_measurement(run_result["metadatas"][0], iteration,
"query_parsing_time")
add_measurement(run_result["metadatas"][0], iteration,
"query_plan_execution_time")
add_measurement(run_result["metadatas"][0], iteration,
"query_planning_time")
execute("iterteardown")
if self.perf:
self.perf.stop()
# TODO value outlier detection and warning across iterations
execute("teardown")
runner.stop()
self.append_scenario_summary(scenario_name, measurement_sums,
num_iterations)
return measurements
def append_scenario_summary(self, scenario_name, measurement_sums,
num_iterations):
self.summary += "{:>30}".format(scenario_name)
for key in ("query_parsing_time", "query_planning_time",
"query_plan_execution_time", WALL_TIME):
if key not in measurement_sums:
time = "-"
else:
time = "{:.10f}".format(measurement_sums[key] / num_iterations)
self.summary += "{:>30}".format(time)
self.summary += "\n"
def runners(self):
""" Which runners can execute a QuerySuite scenario """
return ["MemgraphRunner"]
def groups(self):
""" Which groups can be executed by a QuerySuite scenario """
return ["create", "match", "expression", "aggregation", "return",
"update", "delete", "hardcoded"]
class MemgraphRunner:
"""
Knows how to start and stop Memgraph (backend) some client frontent
(bolt), and execute a cypher query.
Execution returns benchmarking data (execution times, memory
usage etc).
"""
def __init__(self, args):
"""
Creates and configures MemgraphRunner.
Args:
args: args to pass to ArgumentParser
"""
log.info("Initializing MemgraphRunner with arguments %r", args)
# parse arguments
argp = ArgumentParser("MemgraphRunnerArgumentParser")
argp.add_argument("--MemgraphRunnerBin",
default=os.path.join(os.path.dirname(__file__),
"../../../build/memgraph"))
argp.add_argument("--MemgraphRunnerConfig", required=False)
argp.add_argument("--MemgraphRunnerURI", default="bolt://localhost:7687")
argp.add_argument("--MemgraphRunnerEncryptBolt", action="store_true")
self.args, _ = argp.parse_known_args(args)
self.memgraph_bin = jail.get_process()
self.bolt_client = jail.get_process()
def start(self):
log.info("MemgraphRunner.start")
environment = os.environ.copy()
if self.args.MemgraphRunnerConfig:
environment["MEMGRAPH_CONFIG"] = self.args.MemgraphRunnerConfig
self.memgraph_bin.run(self.args.MemgraphRunnerBin, env=environment)
# TODO change to a check via SIGUSR
time.sleep(1.0)
return self.memgraph_bin.get_pid()
def execute(self, queries):
log.debug("MemgraphRunner.execute('%s')", str(queries))
client_args = [path.join(path.dirname(__file__), "bolt_client.py")]
client_args.append(self.args.MemgraphRunnerURI)
client_args += queries
if (self.args.MemgraphRunnerEncryptBolt):
client_args.append("--encrypt")
# TODO make the timeout configurable per query or something
return_code = self.bolt_client.run_and_wait(
"python3", client_args, timeout=120)
if return_code != 0:
with open(self.bolt_client.get_stderr()) as f:
stderr = f.read()
log.error("MemgraphRunner - error while executing queries '%s'. "
"Failed with return_code %d and stderr:\n%s",
str(queries), return_code, stderr)
raise Exception("BoltClient execution failed")
with open(self.bolt_client.get_stdout()) as f:
return json.loads(f.read())
def stop(self):
log.info("MemgraphRunner.stop")
self.bolt_client.send_signal(jail.SIGKILL)
self.bolt_client.wait()
self.memgraph_bin.send_signal(jail.SIGKILL)
self.memgraph_bin.wait()
def send_data(storage_url, data_type, payload):
log.info("Sending %d elements of type '%s' to storage at '%s'",
len(payload), data_type, storage_url)
log.debug("Sending payload:\n%s", json.dumps(payload, indent=2))
r = requests.post("%s/store/%s" % (storage_url, data_type), json=payload)
if r.status_code != 200:
raise Exception("Unable to send %s data." % data_type)
log.debug("Storage server response:\n%s", r.json())
if len(r.json()) == 0:
raise Exception("Invalid storage server response")
return r.json()
def send_data_one(storage_url, data_type, payload):
return send_data(storage_url, data_type, [payload])[0]
def parse_known_args():
argp = ArgumentParser(description=__doc__)
# positional, mandatory args
argp.add_argument("suite", help="Suite to run.")
argp.add_argument("runner", help="Engine to use.")
# named, optional arguments
argp.add_argument("--storage-url", help="URL of the storage server")
argp.add_argument("--groups", nargs="+", help="Groups to run. If none are"
" provided, all available grups are run.")
argp.add_argument("--scenarios", nargs="+", help="Scenarios to run. If "
"none are provided, all available are run.")
argp.add_argument("--logging", default="INFO", choices=["INFO", "DEBUG"],
help="Logging level")
argp.add_argument("--additional-run-fields", default={}, type=json.loads,
help="Additional fields to add to the 'run', in JSON")
return argp.parse_known_args()
def main():
args, remaining_args = parse_known_args()
if args.logging:
logging.basicConfig(level=args.logging)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("neo4j.bolt").setLevel(logging.WARNING)
log.info("Memgraph benchmark suite harness")
log.info("Executing for suite '%s', runner '%s', storage '%s'",
args.suite, args.runner, args.storage_url)
# Create suite
suites = {"QuerySuite": QuerySuite}
if args.suite not in suites:
raise Exception(
"Suite '{}' isn't registered. Registered suites are: {}".format(
args.suite, suites))
suite = suites[args.suite](remaining_args)
# Load scenarios
group_scenarios = suites[args.suite].scenarios(remaining_args)
log.info("Loaded %d groups, with a total of %d scenarios",
len(group_scenarios),
sum([len(x) for x in group_scenarios.values()]))
# Create runner
runners = {"MemgraphRunner": MemgraphRunner}
# TODO if make runner argument optional, then execute all runners
if args.runner not in suite.runners():
raise Exception("Runner '{}' not registered for suite '{}'".format(
args.runner, args.suite))
runner = runners[args.runner](remaining_args)
# Validate groups (if provided)
if args.groups:
for group in args.groups:
if group not in suite.groups():
raise Exception("Group '{}' isn't registered for suite '{}'".
format(group, suite))
groups = args.groups
else:
# No groups provided, use all suite group
groups = suite.groups()
# TODO enable scenario filtering on regex
filtered_scenarios = OrderedDict()
for group, scenarios in group_scenarios.items():
if group not in groups:
log.info("Skipping group '%s'", group)
continue
for scenario_name, scenario in scenarios:
if args.scenarios and scenario_name not in args.scenarios:
continue
filtered_scenarios[(group, scenario_name)] = scenario
if (len(filtered_scenarios) == 0):
log.info("No scenarios to execute")
return
log.info("Executing %d scenarios", len(filtered_scenarios))
results = []
for (group, scenario_name), scenario in filtered_scenarios.items():
log.info("Executing group.scenario '%s.%s' with elements %s",
group, scenario_name, list(scenario.keys()))
for iter_result in suite.run(scenario, scenario_name, runner):
iter_result['group'] = group
iter_result['scenario'] = scenario_name
results.append(iter_result)
run = dict()
run["suite"] = args.suite
run["runner"] = runner.__class__.__name__
run["runner_config"] = vars(runner.args)
run.update(args.additional_run_fields)
if args.storage_url is not None:
run_uuid = send_data_one(args.storage_url, "run", run)
for result in results:
result["run"] = run_uuid
send_data(args.storage_url, "measurement", results)
print("\n\n{}\n".format(suite.summary))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,247 @@
#!/usr/bin/python3
import atexit
import os
import resource
import shutil
import subprocess
import sys
import threading
import time
import uuid
from signal import *
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
TEMP_DIR = os.path.join(SCRIPT_DIR, ".temp")
STORAGE_DIR = os.path.join(SCRIPT_DIR, ".storage")
class ProcessException(Exception):
pass
class Process:
def __init__(self, tid):
self._tid = tid
self._proc = None
self._ticks_per_sec = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
self._page_size = os.sysconf(os.sysconf_names["SC_PAGESIZE"])
self._usage = {}
self._files = []
def run(self, binary, args = [], env = {}, timeout = 120, stdin = "/dev/null"):
# don't start a new process if one is already running
if self._proc != None and self._proc.returncode == None:
raise ProcessException
# clear previous usage
self._usage = {}
# create exe list
exe = [binary] + args
# temporary stdout and stderr files
stdout = self._temporary_file("stdout")
stderr = self._temporary_file("stderr")
self._files = [stdout, stderr]
# set environment variables
keys = ["PATH", "HOME", "USER", "LANG", "PWD"]
for key in keys:
env[key] = os.environ[key]
# set start time
self._start_time = time.time()
self._timeout = timeout
# start process
self._proc = subprocess.Popen(exe, env = env,
stdin = open(stdin, "r"),
stdout = open(stdout, "w"),
stderr = open(stderr, "w"))
def wait(self):
if self._proc == None:
raise ProcessException()
self._proc.wait()
return self._proc.returncode
def run_and_wait(self, *args, **kwargs):
self.run(*args, **kwargs)
return self.wait()
def get_pid(self):
if self._proc == None:
raise ProcessException
return self._proc.pid
def get_usage(self):
if self._proc == None:
raise ProcessException
return self._usage
def get_status(self):
if self._proc == None:
raise ProcessException
self._proc.poll()
return self._proc.returncode
def get_stdout(self):
if self._proc == None or self._proc.returncode == None:
raise ProcessException
return self._files[0]
def get_stderr(self):
if self._proc == None or self._proc.returncode == None:
raise ProcessException
return self._files[1]
def send_signal(self, signal):
if self._proc == None:
raise ProcessException
self._proc.send_signal(signal)
def _temporary_file(self, name):
return os.path.join(TEMP_DIR, ".".join([name, str(uuid.uuid4()), "dat"]))
def _set_usage(self, val, name, only_value = False):
self._usage[name] = val
if only_value: return
maxname = "max_" + name
maxval = val
if maxname in self._usage:
maxval = self._usage[maxname]
self._usage[maxname] = max(maxval, val)
def _do_background_tasks(self):
self._update_usage()
self._watchdog()
def _update_usage(self):
if self._proc == None: return
try:
f = open("/proc/{}/stat".format(self._proc.pid), "r")
data_stat = f.read().split()
f.close()
f = open("/proc/{}/statm".format(self._proc.pid), "r")
data_statm = f.read().split()
f.close()
except:
return
# for a description of these fields see: man proc; man times
cpu_time = sum(map(lambda x: int(x) / self._ticks_per_sec, data_stat[13:17]))
self._set_usage(cpu_time, "cpu_usage", only_value = True)
self._set_usage(int(data_stat[19]), "threads")
mem_vm, mem_res, mem_shr = map(lambda x: int(x) * self._page_size // 1024, data_statm[:3])
self._set_usage(mem_res, "memory")
def _watchdog(self):
if self._proc == None or self._proc.returncode != None: return
if time.time() - self._start_time < self._timeout: return
sys.stderr.write("Timeout of {}s reached, sending "
"SIGKILL to {}!\n".format(self._timeout, self))
self.send_signal(SIGKILL)
self.get_status()
def __repr__(self):
return "Process(id={})".format(self._tid)
# private methods -------------------------------------------------------------
PROCESSES_NUM = 8
_processes = [Process(i) for i in range(1, PROCESSES_NUM + 1)]
_last_process = 0
_thread_run = True
def _usage_updater():
while True:
for proc in _processes:
proc._do_background_tasks()
time.sleep(0.1)
_thread = threading.Thread(target = _usage_updater, daemon = True)
_thread.start()
if not os.path.exists(STORAGE_DIR):
os.mkdir(STORAGE_DIR)
if os.path.exists(TEMP_DIR):
shutil.rmtree(TEMP_DIR)
os.mkdir(TEMP_DIR)
_storage_file = os.path.join(STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json")
@atexit.register
def cleanup():
for proc in _processes:
if proc._proc == None: continue
proc.send_signal(SIGKILL)
proc.get_status()
shutil.rmtree(TEMP_DIR)
# end of private methods ------------------------------------------------------
def get_process():
global _last_process
if _last_process < PROCESSES_NUM:
proc = _processes[_last_process]
_last_process += 1
return proc
return None
# TODO: ovo treba napravit
def store_data(data):
# TODO: treba assertat da data ima neke keyeve u sebi
# TODO: to trebaju bit keyevi value, type i sl...
# unit - obavezno
# type - obavezno
# target - (setup, tardown, ...)
# iter
# value - obavezno
# scenario
# group
# status ?
# cpu_clock - obavezno?
# timestamp - obavezno, automatski
# query
# engine
# engine_config
# TODO: store-aj ovo kao validni json
# to znaci da treba bit lista dictionary-ja
pass
if __name__ == "__main__":
proc = get_process()
proc.run("/home/matej/memgraph/jail/api/tester", timeout = 1500)
#proc.run("/home/matej/memgraph/memgraph/build/memgraph_829_97638d3_dev_debug", env={"MEMGRAPH_CONFIG": "/home/matej/memgraph/memgraph/config/memgraph.yaml"})
time.sleep( 1.000 )
print("usage1:", proc.get_usage())
print("status:", proc.get_status())
#proc.send_signal(SIGTERM)
cnt = 0
while proc.get_status() == None:
usage = proc.get_usage()
usage_str = " "
for key in sorted(usage.keys()):
usage_str += key + (": %10.3f" % usage[key]) + "; "
print(usage_str)
time.sleep( 0.1 )
cnt += 1
proc.send_signal(SIGTERM)
print("status", proc.get_status())
print("usage2", proc.get_usage())
while proc.get_status() == None:
print("cekam da umre...")
time.sleep( 0.1 )
print("stdout 3:", proc.get_stdout())

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from pathlib import Path
import subprocess
import signal
class Perf():
def __init__(self):
self.first = True
self.max_frequency = Path(
"/proc/sys/kernel/perf_event_max_sample_rate").read_text().strip()
# Check if lbr is available.
status = subprocess.call(
"perf record --call-graph=lbr -a -g sleep 0.0000001".split())
self.call_graph_technique = "lbr" if not status else "dwarf"
def start(self, pid, frequency=None):
if frequency is None: frequency = self.max_frequency
append = "-A" if not self.first else ""
self.first = False
perf_command = "perf record --call-graph={} -F {} -p {} -g {}".format(
self.call_graph_technique, frequency, pid, append).split()
self.perf_proc = subprocess.Popen(perf_command)
def stop(self):
self.perf_proc.send_signal(signal.SIGUSR1)
self.perf_proc.wait()

11
tests/benchmark_infra/init.sh Executable file
View File

@ -0,0 +1,11 @@
#!/bin/bash
if [ ! -d ve3 ]; then
virtualenv -p python3 ve3 || exit 1
source ve3/bin/activate
pip install -r requirements.txt || exit 1
else
source ve3/bin/activate
fi
# TODO: apt-get install python3-tk

View File

@ -0,0 +1,19 @@
appdirs==1.4.3
click==6.7
cycler==0.10.0
Flask==0.12.1
itsdangerous==0.24
Jinja2==2.9.6
MarkupSafe==1.0
matplotlib==2.0.1
neo4j-driver==1.2.1
numpy==1.12.1
packaging==16.8
pkg-resources==0.0.0
pyparsing==2.2.0
python-dateutil==2.6.0
pytz==2017.2
requests==2.13.0
scipy==0.19.0
six==1.10.0
Werkzeug==0.12.1