diff --git a/tests/public_benchmark/ldbc/continuous_integration b/tests/public_benchmark/ldbc/continuous_integration index 694ec45f4..ad99aecf0 100644 --- a/tests/public_benchmark/ldbc/continuous_integration +++ b/tests/public_benchmark/ldbc/continuous_integration @@ -15,3 +15,6 @@ TIMEOUT=3600 ./run_benchmark --run-db neo4j --create-index --thread-count $THREA TIMEOUT=3600 ./run_benchmark --run-db memgraph --create-index --thread-count $THREADS --result-file-prefix update --test-type updates --time-compression-ratio 1.5 --operation-count 200 TIMEOUT=3600 ./run_benchmark --run-db neo4j --create-index --thread-count $THREADS --result-file-prefix update --test-type updates --time-compression-ratio 1.5 --operation-count 200 ./ve3/bin/python3 ../../../tools/plot_ldbc_latency --results results/update-memgraph-scale_1-LDBC-results.json results/update-neo4j-scale_1-LDBC-results.json --logo-path plots/ldbc-logo.png --plot-title "Update queries, scale 1" --output plots/update-queries-scale_1.png + +# convert results to Apollo measurements +./convert_results diff --git a/tests/public_benchmark/ldbc/convert_results b/tests/public_benchmark/ldbc/convert_results new file mode 100755 index 000000000..8a642b457 --- /dev/null +++ b/tests/public_benchmark/ldbc/convert_results @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +import json +import os +import sys + +# paths +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +RESULTS_DIR = os.path.join(SCRIPT_DIR, "results") +MEASUREMENTS_PATH = os.path.join(SCRIPT_DIR, ".apollo_measurements") + +LDBC_TIME_FACTORS = { + "SECONDS": 1.0, + "MILLISECONDS": 1000.0, + "MICROSECONDS": 1000000.0, + "NANOSECONDS": 1000000000.0 +} + +def generate_measurements(path): + ret = "" + action, db, scale, ldbc, results = os.path.basename(path).split("-") + test_path = "{}.{}.{}".format(action, scale, db) + with open(path) as f: + results = json.load(f) + metrics = ["total_duration", "total_count", "throughput"] + divs = [LDBC_TIME_FACTORS[results["unit"]], 1, 1] + for metric, div in zip(metrics, divs): + ret += "{}.{} {}\n".format(test_path, metric, results[metric] / div) + for result in results["all_metrics"]: + name = result["name"] + run_time = dict(result["run_time"]) + unit = run_time.pop("unit") + run_time.pop("name") + for key, value in run_time.items(): + scale = LDBC_TIME_FACTORS[unit] if key != "count" else 1 + ret += "{}.queries.{}.{} {}\n".format(test_path, name, key, value / scale) + return ret + +measurements = "" +for fname in sorted(os.listdir(RESULTS_DIR)): + path = os.path.join(RESULTS_DIR, fname) + if not os.path.isfile(path): continue + if not path.endswith(".json"): continue + measurements += generate_measurements(path) + +with open(MEASUREMENTS_PATH, "w") as f: + f.write(measurements) diff --git a/tests/qa/continuous_integration b/tests/qa/continuous_integration index 775840231..3922b03cc 100755 --- a/tests/qa/continuous_integration +++ b/tests/qa/continuous_integration @@ -26,6 +26,7 @@ extra_suites = ["openCypher_M06"] results_folder = os.path.join("tck_engine", "results") suite_suffix = "memgraph-{}.json" qa_status_path = ".quality_assurance_status" +measurements_path = ".apollo_measurements" def get_newest_path(folder, suffix): @@ -43,15 +44,31 @@ def get_newest_path(folder, suffix): return os.path.join(folder, name_list.pop()) -def generate_status(suite, f, required = False): +def generate_measurements(suite, result_path): """ :param suite: Test suite name. - :param f: Json file with status report. + :param result_path: File path with json status report. + + :return: Measurements string. + """ + with open(result_path) as f: + result = json.load(f) + ret = "" + for i in ["total", "passed"]: + ret += "{}.{} {}\n".format(suite, i, result[i]) + return ret + + +def generate_status(suite, result_path, required = False): + """ + :param suite: Test suite name. + :param result_path: File path with json status report. :param required: Adds status ticks to the message if required. :return: Status string. """ - result = json.load(f) + with open(result_path) as f: + result = json.load(f) total = result["total"] passed = result["passed"] ratio = passed / total @@ -101,22 +118,25 @@ if __name__ == "__main__": suite_suffix.format(memgraph_suite)) log.info("Memgraph result path is {}".format(memgraph_result_path)) + # measurements + measurements = "" + # status table headers status_data = [["Suite", "Scenarios"]] # read internal scenarios - with open(memgraph_result_path) as f: - memgraph_status, memgraph_passed, memgraph_total \ - = generate_status(memgraph_suite, f, required = True) + memgraph_status, memgraph_passed, memgraph_total = generate_status( + memgraph_suite, memgraph_result_path, required = True) status_data.append([memgraph_suite, memgraph_status]) + measurements += generate_measurements(memgraph_suite, memgraph_result_path) # read extra scenarios for suite in extra_suites: result_path = get_newest_path(results_folder, suite_suffix.format(suite)) log.info("Extra suite '{}' result path is {}".format(suite, result_path)) - with open(result_path) as f: - suite_status, _, _ = generate_status(suite, f) + suite_status, _, _ = generate_status(suite, result_path) status_data.append([suite, suite_status]) + measurements += generate_measurements(suite, result_path) # create status message qa_status_message = generate_remarkup(status_data) @@ -125,7 +145,12 @@ if __name__ == "__main__": with open(qa_status_path, "w") as f: f.write(qa_status_message) + # create the measurements file + with open(measurements_path, "w") as f: + f.write(measurements) + log.info("Status is generated in %s" % qa_status_path) + log.info("Measurements are generated in %s" % measurements_path) if memgraph_total != memgraph_passed: sys.exit("There is a problem with internal scenarios! %s" diff --git a/tests/stress/.gitignore b/tests/stress/.gitignore new file mode 100644 index 000000000..d9d22cbe9 --- /dev/null +++ b/tests/stress/.gitignore @@ -0,0 +1 @@ +.long_running_stats diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index ec103a055..49426be2e 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -72,6 +72,12 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) BUILD_DIR = os.path.join(BASE_DIR, "build") CONFIG_DIR = os.path.join(BASE_DIR, "config") +MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements") + +# long running stats file +STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats") +SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) +LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) # get number of threads if "THREADS" in os.environ: @@ -110,6 +116,8 @@ def run_test(args, test, options, timeout): runtime = time.time() - start print(" Done after {:.3f} seconds".format(runtime)) + return runtime + # parse arguments parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.") @@ -153,9 +161,11 @@ def cleanup(): proc_mg.wait() # run tests +runtimes = {} dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET for test in dataset: - run_test(args, **test) + runtime = run_test(args, **test) + runtimes[os.path.splitext(test["test"])[0]] = runtime # stop memgraph proc_mg.terminate() @@ -163,4 +173,15 @@ ret_mg = proc_mg.wait() if ret_mg != 0: raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg)) +# measurements +measurements = "" +for key, value in runtimes.items(): + measurements += "{}.runtime {}\n".format(key, value) +with open(STATS_FILE) as f: + stats = f.read().split("\n") +measurements += "long_running.queries.executed {}\n".format(stats[0]) +measurements += "long_running.queries.failed {}\n".format(stats[1]) +with open(MEASUREMENTS_FILE, "w") as f: + f.write(measurements) + print("Done!") diff --git a/tests/stress/long_running.cpp b/tests/stress/long_running.cpp index e5fefd1a7..476f4347b 100644 --- a/tests/stress/long_running.cpp +++ b/tests/stress/long_running.cpp @@ -33,6 +33,8 @@ DEFINE_int32(worker_count, 1, DEFINE_bool(global_queries, true, "If queries that modifiy globally should be executed sometimes"); +DEFINE_string(stats_file, "", "File into which to write statistics."); + /** * Encapsulates a Graph and a Bolt session and provides CRUD op functions. * Also defines a run-loop for a generic exectutor, and a graph state @@ -349,6 +351,16 @@ class GraphSession { } } } + + uint64_t GetExecutedQueries() { return executed_queries_; } + + uint64_t GetFailedQueries() { + uint64_t failed = 0; + for (const auto &item : query_failures_) { + failed += item.second; + } + return failed; + } }; int main(int argc, char **argv) { @@ -379,20 +391,34 @@ int main(int argc, char **argv) { // close client client.Close(); + // sessions + std::vector<GraphSession> sessions; + for (int i = 0; i < FLAGS_worker_count; ++i) { + sessions.emplace_back(i); + } + // workers std::vector<std::thread> threads; - for (int i = 0; i < FLAGS_worker_count; ++i) { - threads.push_back(std::thread([&, i]() { - GraphSession session(i); - session.Run(); - })); + threads.push_back(std::thread([&, i]() { sessions[i].Run(); })); } for (int i = 0; i < FLAGS_worker_count; ++i) { threads[i].join(); } + if (FLAGS_stats_file != "") { + uint64_t executed = 0, failed = 0; + for (int i = 0; i < FLAGS_worker_count; ++i) { + executed += sessions[i].GetExecutedQueries(); + failed += sessions[i].GetFailedQueries(); + } + std::ofstream stream(FLAGS_stats_file); + stream << executed << std::endl << failed << std::endl; + LOG(INFO) << fmt::format("Written statistics to file: {}", + FLAGS_stats_file); + } + LOG(INFO) << "All query runners done"; return 0;