diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index aae04e368..cfd79d404 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -24,3 +24,6 @@ add_subdirectory(integration) # feature benchmark test binaries add_subdirectory(feature_benchmark) + +# mgbench benchmark test binaries +add_subdirectory(mgbench) diff --git a/tests/mgbench/.gitignore b/tests/mgbench/.gitignore new file mode 100644 index 000000000..16d3c4dbb --- /dev/null +++ b/tests/mgbench/.gitignore @@ -0,0 +1 @@ +.cache diff --git a/tests/mgbench/CMakeLists.txt b/tests/mgbench/CMakeLists.txt new file mode 100644 index 000000000..8f542b446 --- /dev/null +++ b/tests/mgbench/CMakeLists.txt @@ -0,0 +1,5 @@ +set(test_prefix memgraph__mgbench__) + +add_executable(${test_prefix}client client.cpp) +set_target_properties(${test_prefix}client PROPERTIES OUTPUT_NAME client) +target_link_libraries(${test_prefix}client mg-communication json) diff --git a/tests/mgbench/benchmark.py b/tests/mgbench/benchmark.py new file mode 100755 index 000000000..959943325 --- /dev/null +++ b/tests/mgbench/benchmark.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 + +import argparse +import collections +import copy +import fnmatch +import inspect +import json +import multiprocessing +import random +import sys + +import datasets +import log +import helpers +import runners + + +def get_queries(gen, count): + # Make the generator deterministic. + random.seed(gen.__name__) + # Generate queries. + ret = [] + for i in range(count): + ret.append(gen()) + return ret + + +def match_patterns(dataset, variant, group, test, is_default_variant, + patterns): + for pattern in patterns: + verdict = [fnmatch.fnmatchcase(dataset, pattern[0])] + if pattern[1] != "": + verdict.append(fnmatch.fnmatchcase(variant, pattern[1])) + else: + verdict.append(is_default_variant) + verdict.append(fnmatch.fnmatchcase(group, pattern[2])) + verdict.append(fnmatch.fnmatchcase(test, pattern[3])) + if all(verdict): + return True + return False + + +def filter_benchmarks(generators, patterns): + patterns = copy.deepcopy(patterns) + for i in range(len(patterns)): + pattern = patterns[i].split("/") + if len(pattern) > 4 or len(pattern) == 0: + raise Exception("Invalid benchmark description '" + pattern + "'!") + pattern.extend(["", "*", "*"][len(pattern) - 1:]) + patterns[i] = pattern + filtered = [] + for dataset in sorted(generators.keys()): + generator, tests = generators[dataset] + for variant in generator.VARIANTS: + is_default_variant = variant == generator.DEFAULT_VARIANT + current = collections.defaultdict(list) + for group in tests: + for test_name, test_func in tests[group]: + if match_patterns(dataset, variant, group, test_name, + is_default_variant, patterns): + current[group].append((test_name, test_func)) + if len(current) > 0: + filtered.append((generator(variant), dict(current))) + return filtered + + +# Parse options. +parser = argparse.ArgumentParser( + description="Memgraph benchmark executor.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument("benchmarks", nargs="*", default="", + help="descriptions of benchmarks that should be run; " + "multiple descriptions can be specified to run multiple " + "benchmarks; the description is specified as " + "dataset/variant/group/test; Unix shell-style wildcards " + "can be used in the descriptions; variant, group and test " + "are optional and they can be left out; the default " + "variant is '' which selects the default dataset variant; " + "the default group is '*' which selects all groups; the " + "default test is '*' which selects all tests") +parser.add_argument("--memgraph-binary", + default=helpers.get_binary_path("memgraph"), + help="Memgraph binary used for benchmarking") +parser.add_argument("--client-binary", + default=helpers.get_binary_path("tests/mgbench/client"), + help="client binary used for benchmarking") +parser.add_argument("--num-workers-for-import", type=int, + default=multiprocessing.cpu_count() // 2, + help="number of workers used to import the dataset") +parser.add_argument("--num-workers-for-benchmark", type=int, + default=1, + help="number of workers used to execute the benchmark") +parser.add_argument("--single-threaded-runtime-sec", type=int, + default=10, + help="single threaded duration of each test") +parser.add_argument("--no-load-query-counts", action="store_true", + help="disable loading of cached query counts") +parser.add_argument("--no-save-query-counts", action="store_true", + help="disable storing of cached query counts") +parser.add_argument("--export-results", default="", + help="file path into which results should be exported") +parser.add_argument("--temporary-directory", default="/tmp", + help="directory path where temporary data should " + "be stored") +parser.add_argument("--no-properties-on-edges", action="store_true", + help="disable properties on edges") +args = parser.parse_args() + +# Detect available datasets. +generators = {} +for key in dir(datasets): + if key.startswith("_"): + continue + dataset = getattr(datasets, key) + if not inspect.isclass(dataset) or dataset == datasets.Dataset or \ + not issubclass(dataset, datasets.Dataset): + continue + tests = collections.defaultdict(list) + for funcname in dir(dataset): + if not funcname.startswith("benchmark__"): + continue + group, test = funcname.split("__")[1:] + tests[group].append((test, funcname)) + generators[dataset.NAME] = (dataset, dict(tests)) + if dataset.PROPERTIES_ON_EDGES and args.no_properties_on_edges: + raise Exception("The \"{}\" dataset requires properties on edges, " + "but you have disabled them!".format(dataset.NAME)) + +# List datasets if there is no specified dataset. +if len(args.benchmarks) == 0: + log.init("Available tests") + for name in sorted(generators.keys()): + print("Dataset:", name) + dataset, tests = generators[name] + print(" Variants:", ", ".join(dataset.VARIANTS), + "(default: " + dataset.DEFAULT_VARIANT + ")") + for group in sorted(tests.keys()): + print(" Group:", group) + for test_name, test_func in tests[group]: + print(" Test:", test_name) + sys.exit(0) + +# Create cache, config and results objects. +cache = helpers.Cache() +if not args.no_load_query_counts: + config = cache.load_config() +else: + config = helpers.RecursiveDict() +results = helpers.RecursiveDict() + +# Filter out the generators. +benchmarks = filter_benchmarks(generators, args.benchmarks) + +# Run all specified benchmarks. +for dataset, tests in benchmarks: + log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(), + "dataset") + dataset.prepare(cache.cache_directory("datasets", dataset.NAME, + dataset.get_variant())) + + # Prepare runners and import the dataset. + memgraph = runners.Memgraph(args.memgraph_binary, args.temporary_directory, + not args.no_properties_on_edges) + client = runners.Client(args.client_binary, args.temporary_directory) + memgraph.start_preparation() + ret = client.execute(file_path=dataset.get_file(), + num_workers=args.num_workers_for_import) + usage = memgraph.stop() + + # Display import statistics. + print() + for row in ret: + print("Executed", row["count"], "queries in", row["duration"], + "seconds using", row["num_workers"], + "workers with a total throughput of", row["throughput"], + "queries/second.") + print() + print("The database used", usage["cpu"], + "seconds of CPU time and peaked at", + usage["memory"] / 1024 / 1024, "MiB of RAM.") + + # Save import results. + import_key = [dataset.NAME, dataset.get_variant(), "__import__"] + results.set_value(*import_key, value={"client": ret, "database": usage}) + + # TODO: cache import data + + # Run all benchmarks in all available groups. + for group in sorted(tests.keys()): + for test, funcname in tests[group]: + log.info("Running test:", "{}/{}".format(group, test)) + func = getattr(dataset, funcname) + + # Get number of queries to execute. + # TODO: implement minimum number of queries, `max(10, num_workers)` + config_key = [dataset.NAME, dataset.get_variant(), group, test] + cached_count = config.get_value(*config_key) + if cached_count is None: + print("Determining the number of queries necessary for", + args.single_threaded_runtime_sec, + "seconds of single-threaded runtime...") + # First run to prime the query caches. + memgraph.start_benchmark() + client.execute(queries=get_queries(func, 1), num_workers=1) + # Get a sense of the runtime. + count = 1 + while True: + ret = client.execute(queries=get_queries(func, count), + num_workers=1) + duration = ret[0]["duration"] + should_execute = int(args.single_threaded_runtime_sec / + (duration / count)) + print("executed_queries={}, total_duration={}, " + "query_duration={}, estimated_count={}".format( + count, duration, duration / count, + should_execute)) + # We don't have to execute the next iteration when + # `should_execute` becomes the same order of magnitude as + # `count * 10`. + if should_execute / (count * 10) < 10: + count = should_execute + break + else: + count = count * 10 + memgraph.stop() + config.set_value(*config_key, value={ + "count": count, + "duration": args.single_threaded_runtime_sec}) + else: + print("Using cached query count of", cached_count["count"], + "queries for", cached_count["duration"], + "seconds of single-threaded runtime.") + count = int(cached_count["count"] * + args.single_threaded_runtime_sec / + cached_count["duration"]) + + # Benchmark run. + print("Sample query:", get_queries(func, 1)[0][0]) + print("Executing benchmark with", count, "queries that should " + "yield a single-threaded runtime of", + args.single_threaded_runtime_sec, "seconds.") + print("Queries are executed using", args.num_workers_for_benchmark, + "concurrent clients.") + memgraph.start_benchmark() + ret = client.execute(queries=get_queries(func, count), + num_workers=args.num_workers_for_benchmark)[0] + usage = memgraph.stop() + ret["database"] = usage + + # Output summary. + print() + print("Executed", ret["count"], "queries in", + ret["duration"], "seconds.") + print("Queries have been retried", ret["retries"], "times.") + print("Database used {:.3f} seconds of CPU time.".format( + usage["cpu"])) + print("Database peaked at {:.3f} MiB of memory.".format( + usage["memory"] / 1024.0 / 1024.0)) + print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", + "avg", "max")) + metadata = ret["metadata"] + for key in sorted(metadata.keys()): + print("{name:>30}: {minimum:>20.06f} {average:>20.06f} " + "{maximum:>20.06f}".format(name=key, **metadata[key])) + log.success("Throughput: {:02f} QPS".format(ret["throughput"])) + + # Save results. + results_key = [dataset.NAME, dataset.get_variant(), group, test] + results.set_value(*results_key, value=ret) + +# Save configuration. +if not args.no_save_query_counts: + cache.save_config(config) + +# Export results. +if args.export_results: + with open(args.export_results, "w") as f: + json.dump(results.get_data(), f) diff --git a/tests/mgbench/client.cpp b/tests/mgbench/client.cpp new file mode 100644 index 000000000..17da3cd9d --- /dev/null +++ b/tests/mgbench/client.cpp @@ -0,0 +1,299 @@ +#include <algorithm> +#include <atomic> +#include <chrono> +#include <filesystem> +#include <fstream> +#include <limits> +#include <map> +#include <string> +#include <thread> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <json/json.hpp> + +#include "communication/bolt/client.hpp" +#include "communication/bolt/v1/value.hpp" +#include "utils/exceptions.hpp" +#include "utils/string.hpp" +#include "utils/timer.hpp" + +DEFINE_string(address, "127.0.0.1", "Server address."); +DEFINE_int32(port, 7687, "Server port."); +DEFINE_string(username, "", "Username for the database."); +DEFINE_string(password, "", "Password for the database."); +DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); + +DEFINE_uint64(num_workers, 1, + "Number of workers that should be used to concurrently execute " + "the supplied queries."); +DEFINE_uint64(max_retries, 50, "Maximum number of retries for each query."); +DEFINE_bool( + queries_json, false, + "Set to true to load all queries as as single JSON encoded list. Each item " + "in the list should contain another list whose first element is the query " + "that should be executed and the second element should be a dictionary of " + "query parameters for that query."); + +DEFINE_string(input, "", "Input file. By default stdin is used."); +DEFINE_string(output, "", "Output file. By default stdout is used."); + +std::pair<std::map<std::string, communication::bolt::Value>, uint64_t> +ExecuteNTimesTillSuccess( + communication::bolt::Client *client, const std::string &query, + const std::map<std::string, communication::bolt::Value> ¶ms, + int max_attempts) { + for (uint64_t i = 0; i < max_attempts; ++i) { + try { + auto ret = client->Execute(query, params); + return {std::move(ret.metadata), i}; + } catch (const utils::BasicException &e) { + if (i == max_attempts - 1) { + LOG(FATAL) << "Could not execute query '" << query << "' " + << max_attempts << " times! Error message: " << e.what(); + } else { + continue; + } + } + } + LOG(FATAL) << "Could not execute query '" << query << "' " << max_attempts + << " times!"; +} + +communication::bolt::Value JsonToBoltValue(const nlohmann::json &data) { + switch (data.type()) { + case nlohmann::json::value_t::null: + return {}; + case nlohmann::json::value_t::boolean: + return {data.get<bool>()}; + case nlohmann::json::value_t::string: + return {data.get<std::string>()}; + case nlohmann::json::value_t::number_integer: + return {data.get<int64_t>()}; + case nlohmann::json::value_t::number_unsigned: + return {static_cast<int64_t>(data.get<uint64_t>())}; + case nlohmann::json::value_t::number_float: + return {data.get<double>()}; + case nlohmann::json::value_t::array: { + std::vector<communication::bolt::Value> vec; + vec.reserve(data.size()); + for (const auto &item : data.get<nlohmann::json::array_t>()) { + vec.emplace_back(JsonToBoltValue(item)); + } + return {std::move(vec)}; + } + case nlohmann::json::value_t::object: { + std::map<std::string, communication::bolt::Value> map; + for (const auto &item : data.get<nlohmann::json::object_t>()) { + map.emplace(item.first, JsonToBoltValue(item.second)); + } + return {std::move(map)}; + } + case nlohmann::json::value_t::discarded: + LOG(FATAL) << "Unexpected JSON type!"; + } +} + +class Metadata final { + private: + struct Record { + uint64_t count{0}; + double average{0.0}; + double minimum{std::numeric_limits<double>::infinity()}; + double maximum{-std::numeric_limits<double>::infinity()}; + }; + + public: + void Append(const std::map<std::string, communication::bolt::Value> &values) { + for (const auto &item : values) { + if (!item.second.IsInt() && !item.second.IsDouble()) continue; + auto [it, emplaced] = storage_.emplace(item.first, Record()); + auto &record = it->second; + double value = 0.0; + if (item.second.IsInt()) { + value = item.second.ValueInt(); + } else { + value = item.second.ValueDouble(); + } + ++record.count; + record.average += value; + record.minimum = std::min(record.minimum, value); + record.maximum = std::max(record.maximum, value); + } + } + + nlohmann::json Export() { + nlohmann::json data = nlohmann::json::object(); + for (const auto &item : storage_) { + nlohmann::json row = nlohmann::json::object(); + row["average"] = item.second.average / item.second.count; + row["minimum"] = item.second.minimum; + row["maximum"] = item.second.maximum; + data[item.first] = row; + } + return data; + } + + Metadata &operator+=(const Metadata &other) { + for (const auto &item : other.storage_) { + auto [it, emplaced] = storage_.emplace(item.first, Record()); + auto &record = it->second; + record.count += item.second.count; + record.average += item.second.average; + record.minimum = std::min(record.minimum, item.second.minimum); + record.maximum = std::max(record.maximum, item.second.maximum); + } + return *this; + } + + private: + std::map<std::string, Record> storage_; +}; + +void Execute( + const std::vector<std::pair< + std::string, std::map<std::string, communication::bolt::Value>>> + &queries, + std::ostream *stream) { + std::vector<std::thread> threads; + threads.reserve(FLAGS_num_workers); + + std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0); + std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata()); + std::vector<double> worker_duration(FLAGS_num_workers, 0.0); + + // Start workers and execute queries. + auto size = queries.size(); + std::atomic<bool> run(false); + std::atomic<uint64_t> ready(0); + std::atomic<uint64_t> position(0); + for (int worker = 0; worker < FLAGS_num_workers; ++worker) { + threads.push_back(std::thread([&, worker]() { + io::network::Endpoint endpoint(FLAGS_address, FLAGS_port); + communication::ClientContext context(FLAGS_use_ssl); + communication::bolt::Client client(&context); + client.Connect(endpoint, FLAGS_username, FLAGS_password); + + ready.fetch_add(1, std::memory_order_acq_rel); + while (!run.load(std::memory_order_acq_rel)) + ; + + auto &retries = worker_retries[worker]; + auto &metadata = worker_metadata[worker]; + auto &duration = worker_duration[worker]; + utils::Timer timer; + while (true) { + auto pos = position.fetch_add(1, std::memory_order_acq_rel); + if (pos >= size) break; + const auto &query = queries[pos]; + auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, + FLAGS_max_retries); + retries += ret.second; + metadata.Append(ret.first); + } + duration = timer.Elapsed().count(); + client.Close(); + })); + } + + // Synchronize workers and collect runtime. + while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers) + ; + run.store(true, std::memory_order_acq_rel); + for (int i = 0; i < FLAGS_num_workers; ++i) { + threads[i].join(); + } + + // Create and output summary. + Metadata final_metadata; + uint64_t final_retries = 0; + double final_duration = 0.0; + for (int i = 0; i < FLAGS_num_workers; ++i) { + final_metadata += worker_metadata[i]; + final_retries += worker_retries[i]; + final_duration += worker_duration[i]; + } + final_duration /= FLAGS_num_workers; + nlohmann::json summary = nlohmann::json::object(); + summary["count"] = queries.size(); + summary["duration"] = final_duration; + summary["throughput"] = static_cast<double>(queries.size()) / final_duration; + summary["retries"] = final_retries; + summary["metadata"] = final_metadata.Export(); + summary["num_workers"] = FLAGS_num_workers; + (*stream) << summary.dump() << std::endl; +} + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + communication::Init(); + + std::ifstream ifile; + std::istream *istream{&std::cin}; + if (FLAGS_input != "") { + CHECK(std::filesystem::is_regular_file(FLAGS_input)) + << "Input file isn't a regular file or it doesn't exist!"; + ifile.open(FLAGS_input); + CHECK(ifile) << "Couldn't open input file!"; + istream = &ifile; + } + + std::ofstream ofile; + std::ostream *ostream{&std::cout}; + if (FLAGS_output != "") { + ofile.open(FLAGS_output); + CHECK(ifile) << "Couldn't open output file!"; + ostream = &ofile; + } + + std::vector< + std::pair<std::string, std::map<std::string, communication::bolt::Value>>> + queries; + if (!FLAGS_queries_json) { + // Load simple queries. + std::string query; + while (std::getline(*istream, query)) { + auto trimmed = utils::Trim(query); + if (trimmed == "" || trimmed == ";") { + Execute(queries, ostream); + queries.clear(); + continue; + } + queries.emplace_back(query, + std::map<std::string, communication::bolt::Value>{}); + } + } else { + // Load advanced queries. + std::string row; + while (std::getline(*istream, row)) { + auto data = nlohmann::json::parse(row); + CHECK(data.is_array() && data.size() > 0) + << "The root item of the loaded JSON queries must be a non-empty " + "array!"; + CHECK(data.is_array() && data.size() == 2) + << "Each item of the loaded JSON queries must be an array!"; + if (data.size() == 0) { + Execute(queries, ostream); + queries.clear(); + continue; + } + CHECK(data.size() == 2) + << "Each item of the loaded JSON queries that has " + "data must be an array of length 2!"; + const auto &query = data[0]; + const auto ¶m = data[1]; + CHECK(query.is_string() && param.is_object()) + << "The query must be a string and the parameters must be a " + "dictionary!"; + auto bolt_param = JsonToBoltValue(param); + CHECK(bolt_param.IsMap()) << "The Bolt parameters must be a map!"; + queries.emplace_back(query, std::move(bolt_param.ValueMap())); + } + } + Execute(queries, ostream); + + return 0; +} diff --git a/tests/mgbench/compare_results.py b/tests/mgbench/compare_results.py new file mode 100755 index 000000000..c24144f1a --- /dev/null +++ b/tests/mgbench/compare_results.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 + +import argparse +import json + + +FIELDS = [ + { + "name": "throughput", + "positive_diff_better": True, + "scaling": 1, + "unit": "QPS", + "diff_treshold": 0.05, # 5% + }, + { + "name": "duration", + "positive_diff_better": False, + "scaling": 1, + "unit": "s", + }, + { + "name": "parsing_time", + "positive_diff_better": False, + "scaling": 1000, + "unit": "ms", + }, + { + "name": "planning_time", + "positive_diff_better": False, + "scaling": 1000, + "unit": "ms", + }, + { + "name": "plan_execution_time", + "positive_diff_better": False, + "scaling": 1000, + "unit": "ms", + }, + { + "name": "memory", + "positive_diff_better": False, + "scaling": 1 / 1024 / 1024, + "unit": "MiB", + "diff_treshold": 0.02, # 2% + }, +] + + +def load_results(fname): + with open(fname) as f: + return json.load(f) + + +def compute_diff(value_from, value_to): + if value_from is None: + return {"value": value_to} + diff = (value_to - value_from) / value_from + return {"value": value_to, "diff": diff} + + +def recursive_get(data, *args, value=None): + for arg in args: + if arg not in data: + return value + data = data[arg] + return data + + +def compare_results(results_from, results_to, fields): + ret = {} + for dataset, variants in results_to.items(): + for variant, groups in variants.items(): + for group, scenarios in groups.items(): + if group == "__import__": + continue + for scenario, summary_to in scenarios.items(): + summary_from = recursive_get( + results_from, dataset, variant, group, scenario, + value={}) + if len(summary_from) > 0 and \ + summary_to["count"] != summary_from["count"] or \ + summary_to["num_workers"] != \ + summary_to["num_workers"]: + raise Exception("Incompatible results!") + testcode = "/".join([dataset, variant, group, scenario, + "{:02d}".format( + summary_to["num_workers"])]) + row = {} + performance_changed = False + for field in fields: + key = field["name"] + if key in summary_to: + row[key] = compute_diff( + summary_from.get(key, None), + summary_to[key]) + elif key in summary_to["database"]: + row[key] = compute_diff( + recursive_get(summary_from, "database", key, + value=None), + summary_to["database"][key]) + else: + row[key] = compute_diff( + recursive_get(summary_from, "metadata", key, + "average", value=None), + summary_to["metadata"][key]["average"]) + if "diff" not in row[key] or \ + ("diff_treshold" in field and + abs(row[key]["diff"]) >= + field["diff_treshold"]): + performance_changed = True + if performance_changed: + ret[testcode] = row + return ret + + +def generate_remarkup(fields, data): + ret = "==== Benchmark summary: ====\n\n" + if len(data) > 0: + ret += "<table>\n" + ret += " <tr>\n" + ret += " <th>Testcode</th>\n" + ret += "\n".join(map(lambda x: " <th>{}</th>".format( + x["name"].replace("_", " ").capitalize()), fields)) + "\n" + ret += " </tr>\n" + for testcode in sorted(data.keys()): + ret += " <tr>\n" + ret += " <td>{}</td>\n".format(testcode) + for field in fields: + result = data[testcode][field["name"]] + value = result["value"] * field["scaling"] + if "diff" in result: + diff = result["diff"] + arrow = "arrow-up" if diff >= 0 else "arrow-down" + if not (field["positive_diff_better"] ^ (diff >= 0)): + color = "green" + else: + color = "red" + sign = "{{icon {} color={}}}".format(arrow, color) + ret += " <td>{:.3f}{} //({:+.2%})// {}</td>\n".format( + value, field["unit"], diff, sign) + else: + ret += " <td>{:.3f}{} //(new)// " \ + "{{icon plus color=blue}}</td>\n".format( + value, field["unit"]) + ret += " </tr>\n" + ret += "</table>\n" + else: + ret += "No performance change detected.\n" + return ret + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Compare results of multiple benchmark runs.") + parser.add_argument("--compare", action="append", nargs=2, + metavar=("from", "to"), + help="compare results between `from` and `to` files") + parser.add_argument("--output", default="", help="output file name") + args = parser.parse_args() + + if args.compare is None or len(args.compare) == 0: + raise Exception("You must specify at least one pair of files!") + + data = {} + for file_from, file_to in args.compare: + results_from = load_results(file_from) + results_to = load_results(file_to) + data.update(compare_results(results_from, results_to, FIELDS)) + + remarkup = generate_remarkup(FIELDS, data) + if args.output: + with open(args.output, "w") as f: + f.write(remarkup) + else: + print(remarkup, end="") diff --git a/tests/mgbench/datasets.py b/tests/mgbench/datasets.py new file mode 100644 index 000000000..868c0adbe --- /dev/null +++ b/tests/mgbench/datasets.py @@ -0,0 +1,279 @@ +import random + +import helpers + + +# Base dataset class used as a template to create each individual dataset. All +# common logic is handled here. +class Dataset: + # Name of the dataset. + NAME = "Base dataset" + # List of all variants of the dataset that exist. + VARIANTS = ["default"] + # One of the available variants that should be used as the default variant. + DEFAULT_VARIANT = "default" + # List of query files that should be used to import the dataset. + FILES = { + "default": "/foo/bar", + } + # List of query file URLs that should be used to import the dataset. + URLS = None + # Number of vertices/edges for each variant. + SIZES = { + "default": {"vertices": 0, "edges": 0}, + } + # Indicates whether the dataset has properties on edges. + PROPERTIES_ON_EDGES = False + + def __init__(self, variant=None): + """ + Accepts a `variant` variable that indicates which variant + of the dataset should be executed. + """ + if variant is None: + variant = self.DEFAULT_VARIANT + if variant not in self.VARIANTS: + raise ValueError("Invalid test variant!") + if (self.FILES and variant not in self.FILES) and \ + (self.URLS and variant not in self.URLS): + raise ValueError("The variant doesn't have a defined URL or " + "file path!") + if variant not in self.SIZES: + raise ValueError("The variant doesn't have a defined dataset " + "size!") + self._variant = variant + if self.FILES is not None: + self._file = self.FILES.get(variant, None) + else: + self._file = None + if self.URLS is not None: + self._url = self.URLS.get(variant, None) + else: + self._url = None + self._size = self.SIZES[variant] + if "vertices" not in self._size or "edges" not in self._size: + raise ValueError("The size defined for this variant doesn't " + "have the number of vertices and/or edges!") + self._num_vertices = self._size["vertices"] + self._num_edges = self._size["edges"] + + def prepare(self, directory): + if self._file is not None: + print("Using dataset file:", self._file) + return + # TODO: add support for JSON datasets + cached_input, exists = directory.get_file("dataset.cypher") + if not exists: + print("Downloading dataset file:", self._url) + downloaded_file = helpers.download_file( + self._url, directory.get_path()) + print("Unpacking and caching file:", downloaded_file) + helpers.unpack_and_move_file(downloaded_file, cached_input) + print("Using cached dataset file:", cached_input) + self._file = cached_input + + def get_variant(self): + """Returns the current variant of the dataset.""" + return self._variant + + def get_file(self): + """ + Returns path to the file that contains dataset creation queries. + """ + return self._file + + def get_size(self): + """Returns number of vertices/edges for the current variant.""" + return self._size + + # All tests should be query generator functions that output all of the + # queries that should be executed by the runner. The functions should be + # named `benchmark__GROUPNAME__TESTNAME` and should not accept any + # arguments. + + +class Pokec(Dataset): + NAME = "pokec" + VARIANTS = ["small", "medium", "large"] + DEFAULT_VARIANT = "small" + FILES = None + URLS = { + "small": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_small.setup.cypher", + "medium": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_medium.setup.cypher", + "large": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_large.setup.cypher.gz", + } + SIZES = { + "small": {"vertices": 10000, "edges": 121716}, + "medium": {"vertices": 100000, "edges": 1768515}, + "large": {"vertices": 1632803, "edges": 30622564}, + } + PROPERTIES_ON_EDGES = False + + # Helpers used to generate the queries + + def _get_random_vertex(self): + # All vertices in the Pokec dataset have an ID in the range + # [1, _num_vertices]. + return random.randint(1, self._num_vertices) + + def _get_random_from_to(self): + vertex_from = self._get_random_vertex() + vertex_to = vertex_from + while vertex_to == vertex_from: + vertex_to = self._get_random_vertex() + return (vertex_from, vertex_to) + + # Arango benchmarks + + def benchmark__arango__single_vertex_read(self): + return ("MATCH (n:User {id : $id}) RETURN n", + {"id": self._get_random_vertex()}) + + def benchmark__arango__single_vertex_write(self): + return ("CREATE (n:UserTemp {id : $id}) RETURN n", + {"id": random.randint(1, self._num_vertices * 10)}) + + def benchmark__arango__single_edge_write(self): + vertex_from, vertex_to = self._get_random_from_to() + return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "CREATE (n)-[e:Temp]->(m) RETURN e", + {"from": vertex_from, "to": vertex_to}) + + def benchmark__arango__aggregate(self): + return ("MATCH (n:User) RETURN n.age, COUNT(*)", {}) + + def benchmark__arango__aggregate_with_filter(self): + return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {}) + + def benchmark__arango__expansion_1(self): + return ("MATCH (s:User {id: $id})-->(n:User) " + "RETURN n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__expansion_1_with_filter(self): + return ("MATCH (s:User {id: $id})-->(n:User) " + "WHERE n.age >= 18 " + "RETURN n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__expansion_2(self): + return ("MATCH (s:User {id: $id})-->()-->(n:User) " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__expansion_2_with_filter(self): + return ("MATCH (s:User {id: $id})-->()-->(n:User) " + "WHERE n.age >= 18 " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__expansion_3(self): + return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__expansion_3_with_filter(self): + return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) " + "WHERE n.age >= 18 " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__expansion_4(self): + return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__expansion_4_with_filter(self): + return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " + "WHERE n.age >= 18 " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__neighbours_2(self): + return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__neighbours_2_with_filter(self): + return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) " + "WHERE n.age >= 18 " + "RETURN DISTINCT n.id", + {"id": self._get_random_vertex()}) + + def benchmark__arango__neighbours_2_with_data(self): + return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) " + "RETURN DISTINCT n.id, n", + {"id": self._get_random_vertex()}) + + def benchmark__arango__neighbours_2_with_data_and_filter(self): + return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) " + "WHERE n.age >= 18 " + "RETURN DISTINCT n.id, n", + {"id": self._get_random_vertex()}) + + def benchmark__arango__shortest_path(self): + vertex_from, vertex_to = self._get_random_from_to() + return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "MATCH p=(n)-[*bfs..15]->(m) " + "RETURN extract(n in nodes(p) | n.id) AS path", + {"from": vertex_from, "to": vertex_to}) + + def benchmark__arango__shortest_path_with_filter(self): + vertex_from, vertex_to = self._get_random_from_to() + return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) " + "RETURN extract(n in nodes(p) | n.id) AS path", + {"from": vertex_from, "to": vertex_to}) + + # Our benchmark queries + + def benchmark__create__edge(self): + vertex_from, vertex_to = self._get_random_from_to() + return ("MATCH (a:User {id: $from}), (b:User {id: $to}) " + "CREATE (a)-[:TempEdge]->(b)", + {"from": vertex_from, "to": vertex_to}) + + def benchmark__create__pattern(self): + return ("CREATE ()-[:TempEdge]->()", {}) + + def benchmark__create__vertex(self): + return ("CREATE ()", {}) + + def benchmark__create__vertex_big(self): + return ("CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, " + "p3: \"Here is some text that is not extremely short\", " + "p4:\"Short text\", p5: 234.434, p6: 11.11, p7: false})", {}) + + def benchmark__aggregation__count(self): + return ("MATCH (n) RETURN count(n), count(n.age)", {}) + + def benchmark__aggregation__min_max_avg(self): + return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {}) + + def benchmark__match__pattern_cycle(self): + return ("MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " + "RETURN e1, m, e2", + {"id": self._get_random_vertex()}) + + def benchmark__match__pattern_long(self): + return ("MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" + "(n3)-[e3]->(n4)<-[e4]-(n5) " + "RETURN n5 LIMIT 1", + {"id": self._get_random_vertex()}) + + def benchmark__match__pattern_short(self): + return ("MATCH (n:User {id: $id})-[e]->(m) " + "RETURN m LIMIT 1", + {"id": self._get_random_vertex()}) + + def benchmark__match__vertex_on_label_property(self): + return ("MATCH (n:User) WITH n WHERE n.id = $id RETURN n", + {"id": self._get_random_vertex()}) + + def benchmark__match__vertex_on_label_property_index(self): + return ("MATCH (n:User {id: $id}) RETURN n", + {"id": self._get_random_vertex()}) + + def benchmark__match__vertex_on_property(self): + return ("MATCH (n {id: $id}) RETURN n", + {"id": self._get_random_vertex()}) diff --git a/tests/mgbench/helpers.py b/tests/mgbench/helpers.py new file mode 100644 index 000000000..ecfa80f1a --- /dev/null +++ b/tests/mgbench/helpers.py @@ -0,0 +1,103 @@ +import copy +import json +import os +import subprocess + + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + + +def get_binary_path(path, base=""): + dirpath = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) + if os.path.exists(os.path.join(dirpath, "build_release")): + dirpath = os.path.join(dirpath, "build_release") + else: + dirpath = os.path.join(dirpath, "build") + return os.path.join(dirpath, path) + + +def download_file(url, path): + ret = subprocess.run(["wget", "-nv", "--content-disposition", url], + stderr=subprocess.PIPE, cwd=path, check=True) + data = ret.stderr.decode("utf-8") + tmp = data.split("->")[1] + name = tmp[tmp.index('"') + 1:tmp.rindex('"')] + return os.path.join(path, name) + + +def unpack_and_move_file(input_path, output_path): + if input_path.endswith(".gz"): + subprocess.run(["gunzip", input_path], + stdout=subprocess.DEVNULL, check=True) + input_path = input_path[:-3] + os.rename(input_path, output_path) + + +def ensure_directory(path): + if not os.path.exists(path): + os.makedirs(path) + if not os.path.isdir(path): + raise Exception("The path '{}' should be a directory!".format(path)) + + +class Directory: + def __init__(self, path): + self._path = path + + def get_path(self): + return self._path + + def get_file(self, name): + path = os.path.join(self._path, name) + if os.path.exists(path) and not os.path.isfile(path): + raise Exception("The path '{}' should be a file!".format(path)) + return (path, os.path.isfile(path)) + + +class RecursiveDict: + def __init__(self, data={}): + self._data = copy.deepcopy(data) + + def _get_obj_and_key(self, *args): + key = args[-1] + obj = self._data + for item in args[:-1]: + if item not in obj: + obj[item] = {} + obj = obj[item] + return (obj, key) + + def get_value(self, *args): + obj, key = self._get_obj_and_key(*args) + return obj.get(key, None) + + def set_value(self, *args, value=None): + obj, key = self._get_obj_and_key(*args) + obj[key] = value + + def get_data(self): + return copy.deepcopy(self._data) + + +class Cache: + def __init__(self): + self._directory = os.path.join(SCRIPT_DIR, ".cache") + ensure_directory(self._directory) + self._config = os.path.join(self._directory, "config.json") + + def cache_directory(self, *args): + if len(args) == 0: + raise ValueError("At least one directory level must be supplied!") + path = os.path.join(self._directory, *args) + ensure_directory(path) + return Directory(path) + + def load_config(self): + if not os.path.isfile(self._config): + return RecursiveDict() + with open(self._config) as f: + return RecursiveDict(json.load(f)) + + def save_config(self, config): + with open(self._config, "w") as f: + json.dump(config.get_data(), f) diff --git a/tests/mgbench/log.py b/tests/mgbench/log.py new file mode 100644 index 000000000..6c9a8b74b --- /dev/null +++ b/tests/mgbench/log.py @@ -0,0 +1,31 @@ +COLOR_GRAY = 0 +COLOR_RED = 1 +COLOR_GREEN = 2 +COLOR_YELLOW = 3 +COLOR_BLUE = 4 +COLOR_VIOLET = 5 +COLOR_CYAN = 6 + + +def log(color, *args): + print("\033[1;3{}m~~".format(color), *args, "~~\033[0m") + + +def init(*args): + log(COLOR_BLUE, *args) + + +def info(*args): + log(COLOR_CYAN, *args) + + +def success(*args): + log(COLOR_GREEN, *args) + + +def warning(*args): + log(COLOR_YELLOW, *args) + + +def error(*args): + log(COLOR_RED, *args) diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py new file mode 100644 index 000000000..5a0d60001 --- /dev/null +++ b/tests/mgbench/runners.py @@ -0,0 +1,148 @@ +import atexit +import json +import os +import re +import subprocess +import tempfile +import time + + +def wait_for_server(port, delay=0.1): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] + while subprocess.call(cmd) != 0: + time.sleep(0.01) + time.sleep(delay) + + +def _convert_args_to_flags(*args, **kwargs): + flags = list(args) + for key, value in kwargs.items(): + key = "--" + key.replace("_", "-") + if type(value) == bool: + flags.append(key + "=" + str(value).lower()) + else: + flags.append(key) + flags.append(str(value)) + return flags + + +def _get_usage(pid): + total_cpu = 0 + with open("/proc/{}/stat".format(pid)) as f: + total_cpu = (sum(map(int, f.read().split(")")[1].split()[11:15])) / + os.sysconf(os.sysconf_names["SC_CLK_TCK"])) + peak_rss = 0 + with open("/proc/{}/status".format(pid)) as f: + for row in f: + tmp = row.split() + if tmp[0] == "VmHWM:": + peak_rss = int(tmp[1]) * 1024 + return {"cpu": total_cpu, "memory": peak_rss} + + +class Memgraph: + def __init__(self, memgraph_binary, temporary_dir, properties_on_edges): + self._memgraph_binary = memgraph_binary + self._directory = tempfile.TemporaryDirectory(dir=temporary_dir) + self._properties_on_edges = properties_on_edges + self._proc_mg = None + atexit.register(self._cleanup) + + # Determine Memgraph version + ret = subprocess.run([memgraph_binary, "--version"], + stdout=subprocess.PIPE, check=True) + version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+", + ret.stdout.decode("utf-8")).group(0) + self._memgraph_version = tuple(map(int, version.split("."))) + + def __del__(self): + self._cleanup() + atexit.unregister(self._cleanup) + + def _get_args(self, **kwargs): + data_directory = os.path.join(self._directory.name, "memgraph") + if self._memgraph_version >= (0, 50, 0): + kwargs["data_directory"] = data_directory + else: + kwargs["durability_directory"] = data_directory + if self._memgraph_version >= (0, 50, 0): + kwargs["storage_properties_on_edges"] = self._properties_on_edges + else: + assert self._properties_on_edges, \ + "Older versions of Memgraph can't disable properties on edges!" + kwargs["min_log_level"] = 1 + return _convert_args_to_flags(self._memgraph_binary, **kwargs) + + def _start(self, **kwargs): + if self._proc_mg is not None: + raise Exception("The database process is already running!") + args = self._get_args(**kwargs) + self._proc_mg = subprocess.Popen(args, stdout=subprocess.DEVNULL) + time.sleep(0.2) + if self._proc_mg.poll() is not None: + self._proc_mg = None + raise Exception("The database process died prematurely!") + wait_for_server(7687) + ret = self._proc_mg.poll() + assert ret is None, "The database process died prematurely " \ + "({})!".format(ret) + + def _cleanup(self): + if self._proc_mg is None: + return 0 + usage = _get_usage(self._proc_mg.pid) + self._proc_mg.terminate() + ret = self._proc_mg.wait() + self._proc_mg = None + return ret, usage + + def start_preparation(self): + if self._memgraph_version >= (0, 50, 0): + self._start(storage_snapshot_on_exit=True) + else: + self._start(snapshot_on_exit=True) + + def start_benchmark(self): + # TODO: support custom benchmarking config files! + if self._memgraph_version >= (0, 50, 0): + self._start(storage_recover_on_startup=True) + else: + self._start(db_recover_on_startup=True) + + def stop(self): + ret, usage = self._cleanup() + assert ret == 0, "The database process exited with a non-zero " \ + "status ({})!".format(ret) + return usage + + +class Client: + def __init__(self, client_binary, temporary_directory): + self._client_binary = client_binary + self._directory = tempfile.TemporaryDirectory(dir=temporary_directory) + + def _get_args(self, **kwargs): + return _convert_args_to_flags(self._client_binary, **kwargs) + + def execute(self, queries=None, file_path=None, num_workers=1): + if (queries is None and file_path is None) or \ + (queries is not None and file_path is not None): + raise ValueError("Either queries or input_path must be specified!") + + # TODO: check `file_path.endswith(".json")` to support advanced + # input queries + + queries_json = False + if queries is not None: + queries_json = True + file_path = os.path.join(self._directory.name, "queries.json") + with open(file_path, "w") as f: + for query in queries: + json.dump(query, f) + f.write("\n") + + args = self._get_args(input=file_path, num_workers=num_workers, + queries_json=queries_json) + ret = subprocess.run(args, stdout=subprocess.PIPE, check=True) + data = ret.stdout.decode("utf-8").strip().split("\n") + return list(map(json.loads, data))