Implement measurements collection in tests

Summary:
Implement measurements collection in QA

Implement measurements collection in LDBC

Implement measurements collection in stress tests

Reviewers: mislav.bradac, buda, florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1087
This commit is contained in:
Matej Ferencevic 2017-12-28 16:27:30 +01:00
parent 859641cb0c
commit 82bede9a97
6 changed files with 136 additions and 14 deletions

View File

@ -15,3 +15,6 @@ TIMEOUT=3600 ./run_benchmark --run-db neo4j --create-index --thread-count $THREA
TIMEOUT=3600 ./run_benchmark --run-db memgraph --create-index --thread-count $THREADS --result-file-prefix update --test-type updates --time-compression-ratio 1.5 --operation-count 200
TIMEOUT=3600 ./run_benchmark --run-db neo4j --create-index --thread-count $THREADS --result-file-prefix update --test-type updates --time-compression-ratio 1.5 --operation-count 200
./ve3/bin/python3 ../../../tools/plot_ldbc_latency --results results/update-memgraph-scale_1-LDBC-results.json results/update-neo4j-scale_1-LDBC-results.json --logo-path plots/ldbc-logo.png --plot-title "Update queries, scale 1" --output plots/update-queries-scale_1.png
# convert results to Apollo measurements
./convert_results

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
import json
import os
import sys
# paths
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
RESULTS_DIR = os.path.join(SCRIPT_DIR, "results")
MEASUREMENTS_PATH = os.path.join(SCRIPT_DIR, ".apollo_measurements")
LDBC_TIME_FACTORS = {
"SECONDS": 1.0,
"MILLISECONDS": 1000.0,
"MICROSECONDS": 1000000.0,
"NANOSECONDS": 1000000000.0
}
def generate_measurements(path):
ret = ""
action, db, scale, ldbc, results = os.path.basename(path).split("-")
test_path = "{}.{}.{}".format(action, scale, db)
with open(path) as f:
results = json.load(f)
metrics = ["total_duration", "total_count", "throughput"]
divs = [LDBC_TIME_FACTORS[results["unit"]], 1, 1]
for metric, div in zip(metrics, divs):
ret += "{}.{} {}\n".format(test_path, metric, results[metric] / div)
for result in results["all_metrics"]:
name = result["name"]
run_time = dict(result["run_time"])
unit = run_time.pop("unit")
run_time.pop("name")
for key, value in run_time.items():
scale = LDBC_TIME_FACTORS[unit] if key != "count" else 1
ret += "{}.queries.{}.{} {}\n".format(test_path, name, key, value / scale)
return ret
measurements = ""
for fname in sorted(os.listdir(RESULTS_DIR)):
path = os.path.join(RESULTS_DIR, fname)
if not os.path.isfile(path): continue
if not path.endswith(".json"): continue
measurements += generate_measurements(path)
with open(MEASUREMENTS_PATH, "w") as f:
f.write(measurements)

View File

@ -26,6 +26,7 @@ extra_suites = ["openCypher_M06"]
results_folder = os.path.join("tck_engine", "results")
suite_suffix = "memgraph-{}.json"
qa_status_path = ".quality_assurance_status"
measurements_path = ".apollo_measurements"
def get_newest_path(folder, suffix):
@ -43,15 +44,31 @@ def get_newest_path(folder, suffix):
return os.path.join(folder, name_list.pop())
def generate_status(suite, f, required = False):
def generate_measurements(suite, result_path):
"""
:param suite: Test suite name.
:param f: Json file with status report.
:param result_path: File path with json status report.
:return: Measurements string.
"""
with open(result_path) as f:
result = json.load(f)
ret = ""
for i in ["total", "passed"]:
ret += "{}.{} {}\n".format(suite, i, result[i])
return ret
def generate_status(suite, result_path, required = False):
"""
:param suite: Test suite name.
:param result_path: File path with json status report.
:param required: Adds status ticks to the message if required.
:return: Status string.
"""
result = json.load(f)
with open(result_path) as f:
result = json.load(f)
total = result["total"]
passed = result["passed"]
ratio = passed / total
@ -101,22 +118,25 @@ if __name__ == "__main__":
suite_suffix.format(memgraph_suite))
log.info("Memgraph result path is {}".format(memgraph_result_path))
# measurements
measurements = ""
# status table headers
status_data = [["Suite", "Scenarios"]]
# read internal scenarios
with open(memgraph_result_path) as f:
memgraph_status, memgraph_passed, memgraph_total \
= generate_status(memgraph_suite, f, required = True)
memgraph_status, memgraph_passed, memgraph_total = generate_status(
memgraph_suite, memgraph_result_path, required = True)
status_data.append([memgraph_suite, memgraph_status])
measurements += generate_measurements(memgraph_suite, memgraph_result_path)
# read extra scenarios
for suite in extra_suites:
result_path = get_newest_path(results_folder, suite_suffix.format(suite))
log.info("Extra suite '{}' result path is {}".format(suite, result_path))
with open(result_path) as f:
suite_status, _, _ = generate_status(suite, f)
suite_status, _, _ = generate_status(suite, result_path)
status_data.append([suite, suite_status])
measurements += generate_measurements(suite, result_path)
# create status message
qa_status_message = generate_remarkup(status_data)
@ -125,7 +145,12 @@ if __name__ == "__main__":
with open(qa_status_path, "w") as f:
f.write(qa_status_message)
# create the measurements file
with open(measurements_path, "w") as f:
f.write(measurements)
log.info("Status is generated in %s" % qa_status_path)
log.info("Measurements are generated in %s" % measurements_path)
if memgraph_total != memgraph_passed:
sys.exit("There is a problem with internal scenarios! %s"

1
tests/stress/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.long_running_stats

View File

@ -72,6 +72,12 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
CONFIG_DIR = os.path.join(BASE_DIR, "config")
MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements")
# long running stats file
STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats")
SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
# get number of threads
if "THREADS" in os.environ:
@ -110,6 +116,8 @@ def run_test(args, test, options, timeout):
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
return runtime
# parse arguments
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
@ -153,9 +161,11 @@ def cleanup():
proc_mg.wait()
# run tests
runtimes = {}
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
for test in dataset:
run_test(args, **test)
runtime = run_test(args, **test)
runtimes[os.path.splitext(test["test"])[0]] = runtime
# stop memgraph
proc_mg.terminate()
@ -163,4 +173,15 @@ ret_mg = proc_mg.wait()
if ret_mg != 0:
raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg))
# measurements
measurements = ""
for key, value in runtimes.items():
measurements += "{}.runtime {}\n".format(key, value)
with open(STATS_FILE) as f:
stats = f.read().split("\n")
measurements += "long_running.queries.executed {}\n".format(stats[0])
measurements += "long_running.queries.failed {}\n".format(stats[1])
with open(MEASUREMENTS_FILE, "w") as f:
f.write(measurements)
print("Done!")

View File

@ -33,6 +33,8 @@ DEFINE_int32(worker_count, 1,
DEFINE_bool(global_queries, true,
"If queries that modifiy globally should be executed sometimes");
DEFINE_string(stats_file, "", "File into which to write statistics.");
/**
* Encapsulates a Graph and a Bolt session and provides CRUD op functions.
* Also defines a run-loop for a generic exectutor, and a graph state
@ -349,6 +351,16 @@ class GraphSession {
}
}
}
uint64_t GetExecutedQueries() { return executed_queries_; }
uint64_t GetFailedQueries() {
uint64_t failed = 0;
for (const auto &item : query_failures_) {
failed += item.second;
}
return failed;
}
};
int main(int argc, char **argv) {
@ -379,20 +391,34 @@ int main(int argc, char **argv) {
// close client
client.Close();
// sessions
std::vector<GraphSession> sessions;
for (int i = 0; i < FLAGS_worker_count; ++i) {
sessions.emplace_back(i);
}
// workers
std::vector<std::thread> threads;
for (int i = 0; i < FLAGS_worker_count; ++i) {
threads.push_back(std::thread([&, i]() {
GraphSession session(i);
session.Run();
}));
threads.push_back(std::thread([&, i]() { sessions[i].Run(); }));
}
for (int i = 0; i < FLAGS_worker_count; ++i) {
threads[i].join();
}
if (FLAGS_stats_file != "") {
uint64_t executed = 0, failed = 0;
for (int i = 0; i < FLAGS_worker_count; ++i) {
executed += sessions[i].GetExecutedQueries();
failed += sessions[i].GetFailedQueries();
}
std::ofstream stream(FLAGS_stats_file);
stream << executed << std::endl << failed << std::endl;
LOG(INFO) << fmt::format("Written statistics to file: {}",
FLAGS_stats_file);
}
LOG(INFO) << "All query runners done";
return 0;