Initial implementation of mgbench benchmarks (#4)
This commit is contained in:
parent
e0ffc533b9
commit
4c0fc11f69
@ -24,3 +24,6 @@ add_subdirectory(integration)
|
||||
|
||||
# feature benchmark test binaries
|
||||
add_subdirectory(feature_benchmark)
|
||||
|
||||
# mgbench benchmark test binaries
|
||||
add_subdirectory(mgbench)
|
||||
|
1
tests/mgbench/.gitignore
vendored
Normal file
1
tests/mgbench/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
.cache
|
5
tests/mgbench/CMakeLists.txt
Normal file
5
tests/mgbench/CMakeLists.txt
Normal file
@ -0,0 +1,5 @@
|
||||
set(test_prefix memgraph__mgbench__)
|
||||
|
||||
add_executable(${test_prefix}client client.cpp)
|
||||
set_target_properties(${test_prefix}client PROPERTIES OUTPUT_NAME client)
|
||||
target_link_libraries(${test_prefix}client mg-communication json)
|
279
tests/mgbench/benchmark.py
Executable file
279
tests/mgbench/benchmark.py
Executable file
@ -0,0 +1,279 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import collections
|
||||
import copy
|
||||
import fnmatch
|
||||
import inspect
|
||||
import json
|
||||
import multiprocessing
|
||||
import random
|
||||
import sys
|
||||
|
||||
import datasets
|
||||
import log
|
||||
import helpers
|
||||
import runners
|
||||
|
||||
|
||||
def get_queries(gen, count):
|
||||
# Make the generator deterministic.
|
||||
random.seed(gen.__name__)
|
||||
# Generate queries.
|
||||
ret = []
|
||||
for i in range(count):
|
||||
ret.append(gen())
|
||||
return ret
|
||||
|
||||
|
||||
def match_patterns(dataset, variant, group, test, is_default_variant,
|
||||
patterns):
|
||||
for pattern in patterns:
|
||||
verdict = [fnmatch.fnmatchcase(dataset, pattern[0])]
|
||||
if pattern[1] != "":
|
||||
verdict.append(fnmatch.fnmatchcase(variant, pattern[1]))
|
||||
else:
|
||||
verdict.append(is_default_variant)
|
||||
verdict.append(fnmatch.fnmatchcase(group, pattern[2]))
|
||||
verdict.append(fnmatch.fnmatchcase(test, pattern[3]))
|
||||
if all(verdict):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def filter_benchmarks(generators, patterns):
|
||||
patterns = copy.deepcopy(patterns)
|
||||
for i in range(len(patterns)):
|
||||
pattern = patterns[i].split("/")
|
||||
if len(pattern) > 4 or len(pattern) == 0:
|
||||
raise Exception("Invalid benchmark description '" + pattern + "'!")
|
||||
pattern.extend(["", "*", "*"][len(pattern) - 1:])
|
||||
patterns[i] = pattern
|
||||
filtered = []
|
||||
for dataset in sorted(generators.keys()):
|
||||
generator, tests = generators[dataset]
|
||||
for variant in generator.VARIANTS:
|
||||
is_default_variant = variant == generator.DEFAULT_VARIANT
|
||||
current = collections.defaultdict(list)
|
||||
for group in tests:
|
||||
for test_name, test_func in tests[group]:
|
||||
if match_patterns(dataset, variant, group, test_name,
|
||||
is_default_variant, patterns):
|
||||
current[group].append((test_name, test_func))
|
||||
if len(current) > 0:
|
||||
filtered.append((generator(variant), dict(current)))
|
||||
return filtered
|
||||
|
||||
|
||||
# Parse options.
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Memgraph benchmark executor.",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument("benchmarks", nargs="*", default="",
|
||||
help="descriptions of benchmarks that should be run; "
|
||||
"multiple descriptions can be specified to run multiple "
|
||||
"benchmarks; the description is specified as "
|
||||
"dataset/variant/group/test; Unix shell-style wildcards "
|
||||
"can be used in the descriptions; variant, group and test "
|
||||
"are optional and they can be left out; the default "
|
||||
"variant is '' which selects the default dataset variant; "
|
||||
"the default group is '*' which selects all groups; the "
|
||||
"default test is '*' which selects all tests")
|
||||
parser.add_argument("--memgraph-binary",
|
||||
default=helpers.get_binary_path("memgraph"),
|
||||
help="Memgraph binary used for benchmarking")
|
||||
parser.add_argument("--client-binary",
|
||||
default=helpers.get_binary_path("tests/mgbench/client"),
|
||||
help="client binary used for benchmarking")
|
||||
parser.add_argument("--num-workers-for-import", type=int,
|
||||
default=multiprocessing.cpu_count() // 2,
|
||||
help="number of workers used to import the dataset")
|
||||
parser.add_argument("--num-workers-for-benchmark", type=int,
|
||||
default=1,
|
||||
help="number of workers used to execute the benchmark")
|
||||
parser.add_argument("--single-threaded-runtime-sec", type=int,
|
||||
default=10,
|
||||
help="single threaded duration of each test")
|
||||
parser.add_argument("--no-load-query-counts", action="store_true",
|
||||
help="disable loading of cached query counts")
|
||||
parser.add_argument("--no-save-query-counts", action="store_true",
|
||||
help="disable storing of cached query counts")
|
||||
parser.add_argument("--export-results", default="",
|
||||
help="file path into which results should be exported")
|
||||
parser.add_argument("--temporary-directory", default="/tmp",
|
||||
help="directory path where temporary data should "
|
||||
"be stored")
|
||||
parser.add_argument("--no-properties-on-edges", action="store_true",
|
||||
help="disable properties on edges")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Detect available datasets.
|
||||
generators = {}
|
||||
for key in dir(datasets):
|
||||
if key.startswith("_"):
|
||||
continue
|
||||
dataset = getattr(datasets, key)
|
||||
if not inspect.isclass(dataset) or dataset == datasets.Dataset or \
|
||||
not issubclass(dataset, datasets.Dataset):
|
||||
continue
|
||||
tests = collections.defaultdict(list)
|
||||
for funcname in dir(dataset):
|
||||
if not funcname.startswith("benchmark__"):
|
||||
continue
|
||||
group, test = funcname.split("__")[1:]
|
||||
tests[group].append((test, funcname))
|
||||
generators[dataset.NAME] = (dataset, dict(tests))
|
||||
if dataset.PROPERTIES_ON_EDGES and args.no_properties_on_edges:
|
||||
raise Exception("The \"{}\" dataset requires properties on edges, "
|
||||
"but you have disabled them!".format(dataset.NAME))
|
||||
|
||||
# List datasets if there is no specified dataset.
|
||||
if len(args.benchmarks) == 0:
|
||||
log.init("Available tests")
|
||||
for name in sorted(generators.keys()):
|
||||
print("Dataset:", name)
|
||||
dataset, tests = generators[name]
|
||||
print(" Variants:", ", ".join(dataset.VARIANTS),
|
||||
"(default: " + dataset.DEFAULT_VARIANT + ")")
|
||||
for group in sorted(tests.keys()):
|
||||
print(" Group:", group)
|
||||
for test_name, test_func in tests[group]:
|
||||
print(" Test:", test_name)
|
||||
sys.exit(0)
|
||||
|
||||
# Create cache, config and results objects.
|
||||
cache = helpers.Cache()
|
||||
if not args.no_load_query_counts:
|
||||
config = cache.load_config()
|
||||
else:
|
||||
config = helpers.RecursiveDict()
|
||||
results = helpers.RecursiveDict()
|
||||
|
||||
# Filter out the generators.
|
||||
benchmarks = filter_benchmarks(generators, args.benchmarks)
|
||||
|
||||
# Run all specified benchmarks.
|
||||
for dataset, tests in benchmarks:
|
||||
log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(),
|
||||
"dataset")
|
||||
dataset.prepare(cache.cache_directory("datasets", dataset.NAME,
|
||||
dataset.get_variant()))
|
||||
|
||||
# Prepare runners and import the dataset.
|
||||
memgraph = runners.Memgraph(args.memgraph_binary, args.temporary_directory,
|
||||
not args.no_properties_on_edges)
|
||||
client = runners.Client(args.client_binary, args.temporary_directory)
|
||||
memgraph.start_preparation()
|
||||
ret = client.execute(file_path=dataset.get_file(),
|
||||
num_workers=args.num_workers_for_import)
|
||||
usage = memgraph.stop()
|
||||
|
||||
# Display import statistics.
|
||||
print()
|
||||
for row in ret:
|
||||
print("Executed", row["count"], "queries in", row["duration"],
|
||||
"seconds using", row["num_workers"],
|
||||
"workers with a total throughput of", row["throughput"],
|
||||
"queries/second.")
|
||||
print()
|
||||
print("The database used", usage["cpu"],
|
||||
"seconds of CPU time and peaked at",
|
||||
usage["memory"] / 1024 / 1024, "MiB of RAM.")
|
||||
|
||||
# Save import results.
|
||||
import_key = [dataset.NAME, dataset.get_variant(), "__import__"]
|
||||
results.set_value(*import_key, value={"client": ret, "database": usage})
|
||||
|
||||
# TODO: cache import data
|
||||
|
||||
# Run all benchmarks in all available groups.
|
||||
for group in sorted(tests.keys()):
|
||||
for test, funcname in tests[group]:
|
||||
log.info("Running test:", "{}/{}".format(group, test))
|
||||
func = getattr(dataset, funcname)
|
||||
|
||||
# Get number of queries to execute.
|
||||
# TODO: implement minimum number of queries, `max(10, num_workers)`
|
||||
config_key = [dataset.NAME, dataset.get_variant(), group, test]
|
||||
cached_count = config.get_value(*config_key)
|
||||
if cached_count is None:
|
||||
print("Determining the number of queries necessary for",
|
||||
args.single_threaded_runtime_sec,
|
||||
"seconds of single-threaded runtime...")
|
||||
# First run to prime the query caches.
|
||||
memgraph.start_benchmark()
|
||||
client.execute(queries=get_queries(func, 1), num_workers=1)
|
||||
# Get a sense of the runtime.
|
||||
count = 1
|
||||
while True:
|
||||
ret = client.execute(queries=get_queries(func, count),
|
||||
num_workers=1)
|
||||
duration = ret[0]["duration"]
|
||||
should_execute = int(args.single_threaded_runtime_sec /
|
||||
(duration / count))
|
||||
print("executed_queries={}, total_duration={}, "
|
||||
"query_duration={}, estimated_count={}".format(
|
||||
count, duration, duration / count,
|
||||
should_execute))
|
||||
# We don't have to execute the next iteration when
|
||||
# `should_execute` becomes the same order of magnitude as
|
||||
# `count * 10`.
|
||||
if should_execute / (count * 10) < 10:
|
||||
count = should_execute
|
||||
break
|
||||
else:
|
||||
count = count * 10
|
||||
memgraph.stop()
|
||||
config.set_value(*config_key, value={
|
||||
"count": count,
|
||||
"duration": args.single_threaded_runtime_sec})
|
||||
else:
|
||||
print("Using cached query count of", cached_count["count"],
|
||||
"queries for", cached_count["duration"],
|
||||
"seconds of single-threaded runtime.")
|
||||
count = int(cached_count["count"] *
|
||||
args.single_threaded_runtime_sec /
|
||||
cached_count["duration"])
|
||||
|
||||
# Benchmark run.
|
||||
print("Sample query:", get_queries(func, 1)[0][0])
|
||||
print("Executing benchmark with", count, "queries that should "
|
||||
"yield a single-threaded runtime of",
|
||||
args.single_threaded_runtime_sec, "seconds.")
|
||||
print("Queries are executed using", args.num_workers_for_benchmark,
|
||||
"concurrent clients.")
|
||||
memgraph.start_benchmark()
|
||||
ret = client.execute(queries=get_queries(func, count),
|
||||
num_workers=args.num_workers_for_benchmark)[0]
|
||||
usage = memgraph.stop()
|
||||
ret["database"] = usage
|
||||
|
||||
# Output summary.
|
||||
print()
|
||||
print("Executed", ret["count"], "queries in",
|
||||
ret["duration"], "seconds.")
|
||||
print("Queries have been retried", ret["retries"], "times.")
|
||||
print("Database used {:.3f} seconds of CPU time.".format(
|
||||
usage["cpu"]))
|
||||
print("Database peaked at {:.3f} MiB of memory.".format(
|
||||
usage["memory"] / 1024.0 / 1024.0))
|
||||
print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min",
|
||||
"avg", "max"))
|
||||
metadata = ret["metadata"]
|
||||
for key in sorted(metadata.keys()):
|
||||
print("{name:>30}: {minimum:>20.06f} {average:>20.06f} "
|
||||
"{maximum:>20.06f}".format(name=key, **metadata[key]))
|
||||
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
|
||||
|
||||
# Save results.
|
||||
results_key = [dataset.NAME, dataset.get_variant(), group, test]
|
||||
results.set_value(*results_key, value=ret)
|
||||
|
||||
# Save configuration.
|
||||
if not args.no_save_query_counts:
|
||||
cache.save_config(config)
|
||||
|
||||
# Export results.
|
||||
if args.export_results:
|
||||
with open(args.export_results, "w") as f:
|
||||
json.dump(results.get_data(), f)
|
299
tests/mgbench/client.cpp
Normal file
299
tests/mgbench/client.cpp
Normal file
@ -0,0 +1,299 @@
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
#include <json/json.hpp>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/value.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address.");
|
||||
DEFINE_int32(port, 7687, "Server port.");
|
||||
DEFINE_string(username, "", "Username for the database.");
|
||||
DEFINE_string(password, "", "Password for the database.");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
DEFINE_uint64(num_workers, 1,
|
||||
"Number of workers that should be used to concurrently execute "
|
||||
"the supplied queries.");
|
||||
DEFINE_uint64(max_retries, 50, "Maximum number of retries for each query.");
|
||||
DEFINE_bool(
|
||||
queries_json, false,
|
||||
"Set to true to load all queries as as single JSON encoded list. Each item "
|
||||
"in the list should contain another list whose first element is the query "
|
||||
"that should be executed and the second element should be a dictionary of "
|
||||
"query parameters for that query.");
|
||||
|
||||
DEFINE_string(input, "", "Input file. By default stdin is used.");
|
||||
DEFINE_string(output, "", "Output file. By default stdout is used.");
|
||||
|
||||
std::pair<std::map<std::string, communication::bolt::Value>, uint64_t>
|
||||
ExecuteNTimesTillSuccess(
|
||||
communication::bolt::Client *client, const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms,
|
||||
int max_attempts) {
|
||||
for (uint64_t i = 0; i < max_attempts; ++i) {
|
||||
try {
|
||||
auto ret = client->Execute(query, params);
|
||||
return {std::move(ret.metadata), i};
|
||||
} catch (const utils::BasicException &e) {
|
||||
if (i == max_attempts - 1) {
|
||||
LOG(FATAL) << "Could not execute query '" << query << "' "
|
||||
<< max_attempts << " times! Error message: " << e.what();
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG(FATAL) << "Could not execute query '" << query << "' " << max_attempts
|
||||
<< " times!";
|
||||
}
|
||||
|
||||
communication::bolt::Value JsonToBoltValue(const nlohmann::json &data) {
|
||||
switch (data.type()) {
|
||||
case nlohmann::json::value_t::null:
|
||||
return {};
|
||||
case nlohmann::json::value_t::boolean:
|
||||
return {data.get<bool>()};
|
||||
case nlohmann::json::value_t::string:
|
||||
return {data.get<std::string>()};
|
||||
case nlohmann::json::value_t::number_integer:
|
||||
return {data.get<int64_t>()};
|
||||
case nlohmann::json::value_t::number_unsigned:
|
||||
return {static_cast<int64_t>(data.get<uint64_t>())};
|
||||
case nlohmann::json::value_t::number_float:
|
||||
return {data.get<double>()};
|
||||
case nlohmann::json::value_t::array: {
|
||||
std::vector<communication::bolt::Value> vec;
|
||||
vec.reserve(data.size());
|
||||
for (const auto &item : data.get<nlohmann::json::array_t>()) {
|
||||
vec.emplace_back(JsonToBoltValue(item));
|
||||
}
|
||||
return {std::move(vec)};
|
||||
}
|
||||
case nlohmann::json::value_t::object: {
|
||||
std::map<std::string, communication::bolt::Value> map;
|
||||
for (const auto &item : data.get<nlohmann::json::object_t>()) {
|
||||
map.emplace(item.first, JsonToBoltValue(item.second));
|
||||
}
|
||||
return {std::move(map)};
|
||||
}
|
||||
case nlohmann::json::value_t::discarded:
|
||||
LOG(FATAL) << "Unexpected JSON type!";
|
||||
}
|
||||
}
|
||||
|
||||
class Metadata final {
|
||||
private:
|
||||
struct Record {
|
||||
uint64_t count{0};
|
||||
double average{0.0};
|
||||
double minimum{std::numeric_limits<double>::infinity()};
|
||||
double maximum{-std::numeric_limits<double>::infinity()};
|
||||
};
|
||||
|
||||
public:
|
||||
void Append(const std::map<std::string, communication::bolt::Value> &values) {
|
||||
for (const auto &item : values) {
|
||||
if (!item.second.IsInt() && !item.second.IsDouble()) continue;
|
||||
auto [it, emplaced] = storage_.emplace(item.first, Record());
|
||||
auto &record = it->second;
|
||||
double value = 0.0;
|
||||
if (item.second.IsInt()) {
|
||||
value = item.second.ValueInt();
|
||||
} else {
|
||||
value = item.second.ValueDouble();
|
||||
}
|
||||
++record.count;
|
||||
record.average += value;
|
||||
record.minimum = std::min(record.minimum, value);
|
||||
record.maximum = std::max(record.maximum, value);
|
||||
}
|
||||
}
|
||||
|
||||
nlohmann::json Export() {
|
||||
nlohmann::json data = nlohmann::json::object();
|
||||
for (const auto &item : storage_) {
|
||||
nlohmann::json row = nlohmann::json::object();
|
||||
row["average"] = item.second.average / item.second.count;
|
||||
row["minimum"] = item.second.minimum;
|
||||
row["maximum"] = item.second.maximum;
|
||||
data[item.first] = row;
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
Metadata &operator+=(const Metadata &other) {
|
||||
for (const auto &item : other.storage_) {
|
||||
auto [it, emplaced] = storage_.emplace(item.first, Record());
|
||||
auto &record = it->second;
|
||||
record.count += item.second.count;
|
||||
record.average += item.second.average;
|
||||
record.minimum = std::min(record.minimum, item.second.minimum);
|
||||
record.maximum = std::max(record.maximum, item.second.maximum);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<std::string, Record> storage_;
|
||||
};
|
||||
|
||||
void Execute(
|
||||
const std::vector<std::pair<
|
||||
std::string, std::map<std::string, communication::bolt::Value>>>
|
||||
&queries,
|
||||
std::ostream *stream) {
|
||||
std::vector<std::thread> threads;
|
||||
threads.reserve(FLAGS_num_workers);
|
||||
|
||||
std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0);
|
||||
std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata());
|
||||
std::vector<double> worker_duration(FLAGS_num_workers, 0.0);
|
||||
|
||||
// Start workers and execute queries.
|
||||
auto size = queries.size();
|
||||
std::atomic<bool> run(false);
|
||||
std::atomic<uint64_t> ready(0);
|
||||
std::atomic<uint64_t> position(0);
|
||||
for (int worker = 0; worker < FLAGS_num_workers; ++worker) {
|
||||
threads.push_back(std::thread([&, worker]() {
|
||||
io::network::Endpoint endpoint(FLAGS_address, FLAGS_port);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
|
||||
ready.fetch_add(1, std::memory_order_acq_rel);
|
||||
while (!run.load(std::memory_order_acq_rel))
|
||||
;
|
||||
|
||||
auto &retries = worker_retries[worker];
|
||||
auto &metadata = worker_metadata[worker];
|
||||
auto &duration = worker_duration[worker];
|
||||
utils::Timer timer;
|
||||
while (true) {
|
||||
auto pos = position.fetch_add(1, std::memory_order_acq_rel);
|
||||
if (pos >= size) break;
|
||||
const auto &query = queries[pos];
|
||||
auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second,
|
||||
FLAGS_max_retries);
|
||||
retries += ret.second;
|
||||
metadata.Append(ret.first);
|
||||
}
|
||||
duration = timer.Elapsed().count();
|
||||
client.Close();
|
||||
}));
|
||||
}
|
||||
|
||||
// Synchronize workers and collect runtime.
|
||||
while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers)
|
||||
;
|
||||
run.store(true, std::memory_order_acq_rel);
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Create and output summary.
|
||||
Metadata final_metadata;
|
||||
uint64_t final_retries = 0;
|
||||
double final_duration = 0.0;
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
final_metadata += worker_metadata[i];
|
||||
final_retries += worker_retries[i];
|
||||
final_duration += worker_duration[i];
|
||||
}
|
||||
final_duration /= FLAGS_num_workers;
|
||||
nlohmann::json summary = nlohmann::json::object();
|
||||
summary["count"] = queries.size();
|
||||
summary["duration"] = final_duration;
|
||||
summary["throughput"] = static_cast<double>(queries.size()) / final_duration;
|
||||
summary["retries"] = final_retries;
|
||||
summary["metadata"] = final_metadata.Export();
|
||||
summary["num_workers"] = FLAGS_num_workers;
|
||||
(*stream) << summary.dump() << std::endl;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::Init();
|
||||
|
||||
std::ifstream ifile;
|
||||
std::istream *istream{&std::cin};
|
||||
if (FLAGS_input != "") {
|
||||
CHECK(std::filesystem::is_regular_file(FLAGS_input))
|
||||
<< "Input file isn't a regular file or it doesn't exist!";
|
||||
ifile.open(FLAGS_input);
|
||||
CHECK(ifile) << "Couldn't open input file!";
|
||||
istream = &ifile;
|
||||
}
|
||||
|
||||
std::ofstream ofile;
|
||||
std::ostream *ostream{&std::cout};
|
||||
if (FLAGS_output != "") {
|
||||
ofile.open(FLAGS_output);
|
||||
CHECK(ifile) << "Couldn't open output file!";
|
||||
ostream = &ofile;
|
||||
}
|
||||
|
||||
std::vector<
|
||||
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
|
||||
queries;
|
||||
if (!FLAGS_queries_json) {
|
||||
// Load simple queries.
|
||||
std::string query;
|
||||
while (std::getline(*istream, query)) {
|
||||
auto trimmed = utils::Trim(query);
|
||||
if (trimmed == "" || trimmed == ";") {
|
||||
Execute(queries, ostream);
|
||||
queries.clear();
|
||||
continue;
|
||||
}
|
||||
queries.emplace_back(query,
|
||||
std::map<std::string, communication::bolt::Value>{});
|
||||
}
|
||||
} else {
|
||||
// Load advanced queries.
|
||||
std::string row;
|
||||
while (std::getline(*istream, row)) {
|
||||
auto data = nlohmann::json::parse(row);
|
||||
CHECK(data.is_array() && data.size() > 0)
|
||||
<< "The root item of the loaded JSON queries must be a non-empty "
|
||||
"array!";
|
||||
CHECK(data.is_array() && data.size() == 2)
|
||||
<< "Each item of the loaded JSON queries must be an array!";
|
||||
if (data.size() == 0) {
|
||||
Execute(queries, ostream);
|
||||
queries.clear();
|
||||
continue;
|
||||
}
|
||||
CHECK(data.size() == 2)
|
||||
<< "Each item of the loaded JSON queries that has "
|
||||
"data must be an array of length 2!";
|
||||
const auto &query = data[0];
|
||||
const auto ¶m = data[1];
|
||||
CHECK(query.is_string() && param.is_object())
|
||||
<< "The query must be a string and the parameters must be a "
|
||||
"dictionary!";
|
||||
auto bolt_param = JsonToBoltValue(param);
|
||||
CHECK(bolt_param.IsMap()) << "The Bolt parameters must be a map!";
|
||||
queries.emplace_back(query, std::move(bolt_param.ValueMap()));
|
||||
}
|
||||
}
|
||||
Execute(queries, ostream);
|
||||
|
||||
return 0;
|
||||
}
|
175
tests/mgbench/compare_results.py
Executable file
175
tests/mgbench/compare_results.py
Executable file
@ -0,0 +1,175 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
|
||||
FIELDS = [
|
||||
{
|
||||
"name": "throughput",
|
||||
"positive_diff_better": True,
|
||||
"scaling": 1,
|
||||
"unit": "QPS",
|
||||
"diff_treshold": 0.05, # 5%
|
||||
},
|
||||
{
|
||||
"name": "duration",
|
||||
"positive_diff_better": False,
|
||||
"scaling": 1,
|
||||
"unit": "s",
|
||||
},
|
||||
{
|
||||
"name": "parsing_time",
|
||||
"positive_diff_better": False,
|
||||
"scaling": 1000,
|
||||
"unit": "ms",
|
||||
},
|
||||
{
|
||||
"name": "planning_time",
|
||||
"positive_diff_better": False,
|
||||
"scaling": 1000,
|
||||
"unit": "ms",
|
||||
},
|
||||
{
|
||||
"name": "plan_execution_time",
|
||||
"positive_diff_better": False,
|
||||
"scaling": 1000,
|
||||
"unit": "ms",
|
||||
},
|
||||
{
|
||||
"name": "memory",
|
||||
"positive_diff_better": False,
|
||||
"scaling": 1 / 1024 / 1024,
|
||||
"unit": "MiB",
|
||||
"diff_treshold": 0.02, # 2%
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def load_results(fname):
|
||||
with open(fname) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def compute_diff(value_from, value_to):
|
||||
if value_from is None:
|
||||
return {"value": value_to}
|
||||
diff = (value_to - value_from) / value_from
|
||||
return {"value": value_to, "diff": diff}
|
||||
|
||||
|
||||
def recursive_get(data, *args, value=None):
|
||||
for arg in args:
|
||||
if arg not in data:
|
||||
return value
|
||||
data = data[arg]
|
||||
return data
|
||||
|
||||
|
||||
def compare_results(results_from, results_to, fields):
|
||||
ret = {}
|
||||
for dataset, variants in results_to.items():
|
||||
for variant, groups in variants.items():
|
||||
for group, scenarios in groups.items():
|
||||
if group == "__import__":
|
||||
continue
|
||||
for scenario, summary_to in scenarios.items():
|
||||
summary_from = recursive_get(
|
||||
results_from, dataset, variant, group, scenario,
|
||||
value={})
|
||||
if len(summary_from) > 0 and \
|
||||
summary_to["count"] != summary_from["count"] or \
|
||||
summary_to["num_workers"] != \
|
||||
summary_to["num_workers"]:
|
||||
raise Exception("Incompatible results!")
|
||||
testcode = "/".join([dataset, variant, group, scenario,
|
||||
"{:02d}".format(
|
||||
summary_to["num_workers"])])
|
||||
row = {}
|
||||
performance_changed = False
|
||||
for field in fields:
|
||||
key = field["name"]
|
||||
if key in summary_to:
|
||||
row[key] = compute_diff(
|
||||
summary_from.get(key, None),
|
||||
summary_to[key])
|
||||
elif key in summary_to["database"]:
|
||||
row[key] = compute_diff(
|
||||
recursive_get(summary_from, "database", key,
|
||||
value=None),
|
||||
summary_to["database"][key])
|
||||
else:
|
||||
row[key] = compute_diff(
|
||||
recursive_get(summary_from, "metadata", key,
|
||||
"average", value=None),
|
||||
summary_to["metadata"][key]["average"])
|
||||
if "diff" not in row[key] or \
|
||||
("diff_treshold" in field and
|
||||
abs(row[key]["diff"]) >=
|
||||
field["diff_treshold"]):
|
||||
performance_changed = True
|
||||
if performance_changed:
|
||||
ret[testcode] = row
|
||||
return ret
|
||||
|
||||
|
||||
def generate_remarkup(fields, data):
|
||||
ret = "==== Benchmark summary: ====\n\n"
|
||||
if len(data) > 0:
|
||||
ret += "<table>\n"
|
||||
ret += " <tr>\n"
|
||||
ret += " <th>Testcode</th>\n"
|
||||
ret += "\n".join(map(lambda x: " <th>{}</th>".format(
|
||||
x["name"].replace("_", " ").capitalize()), fields)) + "\n"
|
||||
ret += " </tr>\n"
|
||||
for testcode in sorted(data.keys()):
|
||||
ret += " <tr>\n"
|
||||
ret += " <td>{}</td>\n".format(testcode)
|
||||
for field in fields:
|
||||
result = data[testcode][field["name"]]
|
||||
value = result["value"] * field["scaling"]
|
||||
if "diff" in result:
|
||||
diff = result["diff"]
|
||||
arrow = "arrow-up" if diff >= 0 else "arrow-down"
|
||||
if not (field["positive_diff_better"] ^ (diff >= 0)):
|
||||
color = "green"
|
||||
else:
|
||||
color = "red"
|
||||
sign = "{{icon {} color={}}}".format(arrow, color)
|
||||
ret += " <td>{:.3f}{} //({:+.2%})// {}</td>\n".format(
|
||||
value, field["unit"], diff, sign)
|
||||
else:
|
||||
ret += " <td>{:.3f}{} //(new)// " \
|
||||
"{{icon plus color=blue}}</td>\n".format(
|
||||
value, field["unit"])
|
||||
ret += " </tr>\n"
|
||||
ret += "</table>\n"
|
||||
else:
|
||||
ret += "No performance change detected.\n"
|
||||
return ret
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Compare results of multiple benchmark runs.")
|
||||
parser.add_argument("--compare", action="append", nargs=2,
|
||||
metavar=("from", "to"),
|
||||
help="compare results between `from` and `to` files")
|
||||
parser.add_argument("--output", default="", help="output file name")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.compare is None or len(args.compare) == 0:
|
||||
raise Exception("You must specify at least one pair of files!")
|
||||
|
||||
data = {}
|
||||
for file_from, file_to in args.compare:
|
||||
results_from = load_results(file_from)
|
||||
results_to = load_results(file_to)
|
||||
data.update(compare_results(results_from, results_to, FIELDS))
|
||||
|
||||
remarkup = generate_remarkup(FIELDS, data)
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(remarkup)
|
||||
else:
|
||||
print(remarkup, end="")
|
279
tests/mgbench/datasets.py
Normal file
279
tests/mgbench/datasets.py
Normal file
@ -0,0 +1,279 @@
|
||||
import random
|
||||
|
||||
import helpers
|
||||
|
||||
|
||||
# Base dataset class used as a template to create each individual dataset. All
|
||||
# common logic is handled here.
|
||||
class Dataset:
|
||||
# Name of the dataset.
|
||||
NAME = "Base dataset"
|
||||
# List of all variants of the dataset that exist.
|
||||
VARIANTS = ["default"]
|
||||
# One of the available variants that should be used as the default variant.
|
||||
DEFAULT_VARIANT = "default"
|
||||
# List of query files that should be used to import the dataset.
|
||||
FILES = {
|
||||
"default": "/foo/bar",
|
||||
}
|
||||
# List of query file URLs that should be used to import the dataset.
|
||||
URLS = None
|
||||
# Number of vertices/edges for each variant.
|
||||
SIZES = {
|
||||
"default": {"vertices": 0, "edges": 0},
|
||||
}
|
||||
# Indicates whether the dataset has properties on edges.
|
||||
PROPERTIES_ON_EDGES = False
|
||||
|
||||
def __init__(self, variant=None):
|
||||
"""
|
||||
Accepts a `variant` variable that indicates which variant
|
||||
of the dataset should be executed.
|
||||
"""
|
||||
if variant is None:
|
||||
variant = self.DEFAULT_VARIANT
|
||||
if variant not in self.VARIANTS:
|
||||
raise ValueError("Invalid test variant!")
|
||||
if (self.FILES and variant not in self.FILES) and \
|
||||
(self.URLS and variant not in self.URLS):
|
||||
raise ValueError("The variant doesn't have a defined URL or "
|
||||
"file path!")
|
||||
if variant not in self.SIZES:
|
||||
raise ValueError("The variant doesn't have a defined dataset "
|
||||
"size!")
|
||||
self._variant = variant
|
||||
if self.FILES is not None:
|
||||
self._file = self.FILES.get(variant, None)
|
||||
else:
|
||||
self._file = None
|
||||
if self.URLS is not None:
|
||||
self._url = self.URLS.get(variant, None)
|
||||
else:
|
||||
self._url = None
|
||||
self._size = self.SIZES[variant]
|
||||
if "vertices" not in self._size or "edges" not in self._size:
|
||||
raise ValueError("The size defined for this variant doesn't "
|
||||
"have the number of vertices and/or edges!")
|
||||
self._num_vertices = self._size["vertices"]
|
||||
self._num_edges = self._size["edges"]
|
||||
|
||||
def prepare(self, directory):
|
||||
if self._file is not None:
|
||||
print("Using dataset file:", self._file)
|
||||
return
|
||||
# TODO: add support for JSON datasets
|
||||
cached_input, exists = directory.get_file("dataset.cypher")
|
||||
if not exists:
|
||||
print("Downloading dataset file:", self._url)
|
||||
downloaded_file = helpers.download_file(
|
||||
self._url, directory.get_path())
|
||||
print("Unpacking and caching file:", downloaded_file)
|
||||
helpers.unpack_and_move_file(downloaded_file, cached_input)
|
||||
print("Using cached dataset file:", cached_input)
|
||||
self._file = cached_input
|
||||
|
||||
def get_variant(self):
|
||||
"""Returns the current variant of the dataset."""
|
||||
return self._variant
|
||||
|
||||
def get_file(self):
|
||||
"""
|
||||
Returns path to the file that contains dataset creation queries.
|
||||
"""
|
||||
return self._file
|
||||
|
||||
def get_size(self):
|
||||
"""Returns number of vertices/edges for the current variant."""
|
||||
return self._size
|
||||
|
||||
# All tests should be query generator functions that output all of the
|
||||
# queries that should be executed by the runner. The functions should be
|
||||
# named `benchmark__GROUPNAME__TESTNAME` and should not accept any
|
||||
# arguments.
|
||||
|
||||
|
||||
class Pokec(Dataset):
|
||||
NAME = "pokec"
|
||||
VARIANTS = ["small", "medium", "large"]
|
||||
DEFAULT_VARIANT = "small"
|
||||
FILES = None
|
||||
URLS = {
|
||||
"small": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_small.setup.cypher",
|
||||
"medium": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_medium.setup.cypher",
|
||||
"large": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_large.setup.cypher.gz",
|
||||
}
|
||||
SIZES = {
|
||||
"small": {"vertices": 10000, "edges": 121716},
|
||||
"medium": {"vertices": 100000, "edges": 1768515},
|
||||
"large": {"vertices": 1632803, "edges": 30622564},
|
||||
}
|
||||
PROPERTIES_ON_EDGES = False
|
||||
|
||||
# Helpers used to generate the queries
|
||||
|
||||
def _get_random_vertex(self):
|
||||
# All vertices in the Pokec dataset have an ID in the range
|
||||
# [1, _num_vertices].
|
||||
return random.randint(1, self._num_vertices)
|
||||
|
||||
def _get_random_from_to(self):
|
||||
vertex_from = self._get_random_vertex()
|
||||
vertex_to = vertex_from
|
||||
while vertex_to == vertex_from:
|
||||
vertex_to = self._get_random_vertex()
|
||||
return (vertex_from, vertex_to)
|
||||
|
||||
# Arango benchmarks
|
||||
|
||||
def benchmark__arango__single_vertex_read(self):
|
||||
return ("MATCH (n:User {id : $id}) RETURN n",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__single_vertex_write(self):
|
||||
return ("CREATE (n:UserTemp {id : $id}) RETURN n",
|
||||
{"id": random.randint(1, self._num_vertices * 10)})
|
||||
|
||||
def benchmark__arango__single_edge_write(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"CREATE (n)-[e:Temp]->(m) RETURN e",
|
||||
{"from": vertex_from, "to": vertex_to})
|
||||
|
||||
def benchmark__arango__aggregate(self):
|
||||
return ("MATCH (n:User) RETURN n.age, COUNT(*)", {})
|
||||
|
||||
def benchmark__arango__aggregate_with_filter(self):
|
||||
return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {})
|
||||
|
||||
def benchmark__arango__expansion_1(self):
|
||||
return ("MATCH (s:User {id: $id})-->(n:User) "
|
||||
"RETURN n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__expansion_1_with_filter(self):
|
||||
return ("MATCH (s:User {id: $id})-->(n:User) "
|
||||
"WHERE n.age >= 18 "
|
||||
"RETURN n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__expansion_2(self):
|
||||
return ("MATCH (s:User {id: $id})-->()-->(n:User) "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__expansion_2_with_filter(self):
|
||||
return ("MATCH (s:User {id: $id})-->()-->(n:User) "
|
||||
"WHERE n.age >= 18 "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__expansion_3(self):
|
||||
return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__expansion_3_with_filter(self):
|
||||
return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) "
|
||||
"WHERE n.age >= 18 "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__expansion_4(self):
|
||||
return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__expansion_4_with_filter(self):
|
||||
return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) "
|
||||
"WHERE n.age >= 18 "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__neighbours_2(self):
|
||||
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__neighbours_2_with_filter(self):
|
||||
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
|
||||
"WHERE n.age >= 18 "
|
||||
"RETURN DISTINCT n.id",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__neighbours_2_with_data(self):
|
||||
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
|
||||
"RETURN DISTINCT n.id, n",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__neighbours_2_with_data_and_filter(self):
|
||||
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
|
||||
"WHERE n.age >= 18 "
|
||||
"RETURN DISTINCT n.id, n",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__arango__shortest_path(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=(n)-[*bfs..15]->(m) "
|
||||
"RETURN extract(n in nodes(p) | n.id) AS path",
|
||||
{"from": vertex_from, "to": vertex_to})
|
||||
|
||||
def benchmark__arango__shortest_path_with_filter(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) "
|
||||
"RETURN extract(n in nodes(p) | n.id) AS path",
|
||||
{"from": vertex_from, "to": vertex_to})
|
||||
|
||||
# Our benchmark queries
|
||||
|
||||
def benchmark__create__edge(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return ("MATCH (a:User {id: $from}), (b:User {id: $to}) "
|
||||
"CREATE (a)-[:TempEdge]->(b)",
|
||||
{"from": vertex_from, "to": vertex_to})
|
||||
|
||||
def benchmark__create__pattern(self):
|
||||
return ("CREATE ()-[:TempEdge]->()", {})
|
||||
|
||||
def benchmark__create__vertex(self):
|
||||
return ("CREATE ()", {})
|
||||
|
||||
def benchmark__create__vertex_big(self):
|
||||
return ("CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, "
|
||||
"p3: \"Here is some text that is not extremely short\", "
|
||||
"p4:\"Short text\", p5: 234.434, p6: 11.11, p7: false})", {})
|
||||
|
||||
def benchmark__aggregation__count(self):
|
||||
return ("MATCH (n) RETURN count(n), count(n.age)", {})
|
||||
|
||||
def benchmark__aggregation__min_max_avg(self):
|
||||
return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {})
|
||||
|
||||
def benchmark__match__pattern_cycle(self):
|
||||
return ("MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) "
|
||||
"RETURN e1, m, e2",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__match__pattern_long(self):
|
||||
return ("MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->"
|
||||
"(n3)-[e3]->(n4)<-[e4]-(n5) "
|
||||
"RETURN n5 LIMIT 1",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__match__pattern_short(self):
|
||||
return ("MATCH (n:User {id: $id})-[e]->(m) "
|
||||
"RETURN m LIMIT 1",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__match__vertex_on_label_property(self):
|
||||
return ("MATCH (n:User) WITH n WHERE n.id = $id RETURN n",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__match__vertex_on_label_property_index(self):
|
||||
return ("MATCH (n:User {id: $id}) RETURN n",
|
||||
{"id": self._get_random_vertex()})
|
||||
|
||||
def benchmark__match__vertex_on_property(self):
|
||||
return ("MATCH (n {id: $id}) RETURN n",
|
||||
{"id": self._get_random_vertex()})
|
103
tests/mgbench/helpers.py
Normal file
103
tests/mgbench/helpers.py
Normal file
@ -0,0 +1,103 @@
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
||||
def get_binary_path(path, base=""):
|
||||
dirpath = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
|
||||
if os.path.exists(os.path.join(dirpath, "build_release")):
|
||||
dirpath = os.path.join(dirpath, "build_release")
|
||||
else:
|
||||
dirpath = os.path.join(dirpath, "build")
|
||||
return os.path.join(dirpath, path)
|
||||
|
||||
|
||||
def download_file(url, path):
|
||||
ret = subprocess.run(["wget", "-nv", "--content-disposition", url],
|
||||
stderr=subprocess.PIPE, cwd=path, check=True)
|
||||
data = ret.stderr.decode("utf-8")
|
||||
tmp = data.split("->")[1]
|
||||
name = tmp[tmp.index('"') + 1:tmp.rindex('"')]
|
||||
return os.path.join(path, name)
|
||||
|
||||
|
||||
def unpack_and_move_file(input_path, output_path):
|
||||
if input_path.endswith(".gz"):
|
||||
subprocess.run(["gunzip", input_path],
|
||||
stdout=subprocess.DEVNULL, check=True)
|
||||
input_path = input_path[:-3]
|
||||
os.rename(input_path, output_path)
|
||||
|
||||
|
||||
def ensure_directory(path):
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
if not os.path.isdir(path):
|
||||
raise Exception("The path '{}' should be a directory!".format(path))
|
||||
|
||||
|
||||
class Directory:
|
||||
def __init__(self, path):
|
||||
self._path = path
|
||||
|
||||
def get_path(self):
|
||||
return self._path
|
||||
|
||||
def get_file(self, name):
|
||||
path = os.path.join(self._path, name)
|
||||
if os.path.exists(path) and not os.path.isfile(path):
|
||||
raise Exception("The path '{}' should be a file!".format(path))
|
||||
return (path, os.path.isfile(path))
|
||||
|
||||
|
||||
class RecursiveDict:
|
||||
def __init__(self, data={}):
|
||||
self._data = copy.deepcopy(data)
|
||||
|
||||
def _get_obj_and_key(self, *args):
|
||||
key = args[-1]
|
||||
obj = self._data
|
||||
for item in args[:-1]:
|
||||
if item not in obj:
|
||||
obj[item] = {}
|
||||
obj = obj[item]
|
||||
return (obj, key)
|
||||
|
||||
def get_value(self, *args):
|
||||
obj, key = self._get_obj_and_key(*args)
|
||||
return obj.get(key, None)
|
||||
|
||||
def set_value(self, *args, value=None):
|
||||
obj, key = self._get_obj_and_key(*args)
|
||||
obj[key] = value
|
||||
|
||||
def get_data(self):
|
||||
return copy.deepcopy(self._data)
|
||||
|
||||
|
||||
class Cache:
|
||||
def __init__(self):
|
||||
self._directory = os.path.join(SCRIPT_DIR, ".cache")
|
||||
ensure_directory(self._directory)
|
||||
self._config = os.path.join(self._directory, "config.json")
|
||||
|
||||
def cache_directory(self, *args):
|
||||
if len(args) == 0:
|
||||
raise ValueError("At least one directory level must be supplied!")
|
||||
path = os.path.join(self._directory, *args)
|
||||
ensure_directory(path)
|
||||
return Directory(path)
|
||||
|
||||
def load_config(self):
|
||||
if not os.path.isfile(self._config):
|
||||
return RecursiveDict()
|
||||
with open(self._config) as f:
|
||||
return RecursiveDict(json.load(f))
|
||||
|
||||
def save_config(self, config):
|
||||
with open(self._config, "w") as f:
|
||||
json.dump(config.get_data(), f)
|
31
tests/mgbench/log.py
Normal file
31
tests/mgbench/log.py
Normal file
@ -0,0 +1,31 @@
|
||||
COLOR_GRAY = 0
|
||||
COLOR_RED = 1
|
||||
COLOR_GREEN = 2
|
||||
COLOR_YELLOW = 3
|
||||
COLOR_BLUE = 4
|
||||
COLOR_VIOLET = 5
|
||||
COLOR_CYAN = 6
|
||||
|
||||
|
||||
def log(color, *args):
|
||||
print("\033[1;3{}m~~".format(color), *args, "~~\033[0m")
|
||||
|
||||
|
||||
def init(*args):
|
||||
log(COLOR_BLUE, *args)
|
||||
|
||||
|
||||
def info(*args):
|
||||
log(COLOR_CYAN, *args)
|
||||
|
||||
|
||||
def success(*args):
|
||||
log(COLOR_GREEN, *args)
|
||||
|
||||
|
||||
def warning(*args):
|
||||
log(COLOR_YELLOW, *args)
|
||||
|
||||
|
||||
def error(*args):
|
||||
log(COLOR_RED, *args)
|
148
tests/mgbench/runners.py
Normal file
148
tests/mgbench/runners.py
Normal file
@ -0,0 +1,148 @@
|
||||
import atexit
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
|
||||
def wait_for_server(port, delay=0.1):
|
||||
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
||||
while subprocess.call(cmd) != 0:
|
||||
time.sleep(0.01)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def _convert_args_to_flags(*args, **kwargs):
|
||||
flags = list(args)
|
||||
for key, value in kwargs.items():
|
||||
key = "--" + key.replace("_", "-")
|
||||
if type(value) == bool:
|
||||
flags.append(key + "=" + str(value).lower())
|
||||
else:
|
||||
flags.append(key)
|
||||
flags.append(str(value))
|
||||
return flags
|
||||
|
||||
|
||||
def _get_usage(pid):
|
||||
total_cpu = 0
|
||||
with open("/proc/{}/stat".format(pid)) as f:
|
||||
total_cpu = (sum(map(int, f.read().split(")")[1].split()[11:15])) /
|
||||
os.sysconf(os.sysconf_names["SC_CLK_TCK"]))
|
||||
peak_rss = 0
|
||||
with open("/proc/{}/status".format(pid)) as f:
|
||||
for row in f:
|
||||
tmp = row.split()
|
||||
if tmp[0] == "VmHWM:":
|
||||
peak_rss = int(tmp[1]) * 1024
|
||||
return {"cpu": total_cpu, "memory": peak_rss}
|
||||
|
||||
|
||||
class Memgraph:
|
||||
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges):
|
||||
self._memgraph_binary = memgraph_binary
|
||||
self._directory = tempfile.TemporaryDirectory(dir=temporary_dir)
|
||||
self._properties_on_edges = properties_on_edges
|
||||
self._proc_mg = None
|
||||
atexit.register(self._cleanup)
|
||||
|
||||
# Determine Memgraph version
|
||||
ret = subprocess.run([memgraph_binary, "--version"],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+",
|
||||
ret.stdout.decode("utf-8")).group(0)
|
||||
self._memgraph_version = tuple(map(int, version.split(".")))
|
||||
|
||||
def __del__(self):
|
||||
self._cleanup()
|
||||
atexit.unregister(self._cleanup)
|
||||
|
||||
def _get_args(self, **kwargs):
|
||||
data_directory = os.path.join(self._directory.name, "memgraph")
|
||||
if self._memgraph_version >= (0, 50, 0):
|
||||
kwargs["data_directory"] = data_directory
|
||||
else:
|
||||
kwargs["durability_directory"] = data_directory
|
||||
if self._memgraph_version >= (0, 50, 0):
|
||||
kwargs["storage_properties_on_edges"] = self._properties_on_edges
|
||||
else:
|
||||
assert self._properties_on_edges, \
|
||||
"Older versions of Memgraph can't disable properties on edges!"
|
||||
kwargs["min_log_level"] = 1
|
||||
return _convert_args_to_flags(self._memgraph_binary, **kwargs)
|
||||
|
||||
def _start(self, **kwargs):
|
||||
if self._proc_mg is not None:
|
||||
raise Exception("The database process is already running!")
|
||||
args = self._get_args(**kwargs)
|
||||
self._proc_mg = subprocess.Popen(args, stdout=subprocess.DEVNULL)
|
||||
time.sleep(0.2)
|
||||
if self._proc_mg.poll() is not None:
|
||||
self._proc_mg = None
|
||||
raise Exception("The database process died prematurely!")
|
||||
wait_for_server(7687)
|
||||
ret = self._proc_mg.poll()
|
||||
assert ret is None, "The database process died prematurely " \
|
||||
"({})!".format(ret)
|
||||
|
||||
def _cleanup(self):
|
||||
if self._proc_mg is None:
|
||||
return 0
|
||||
usage = _get_usage(self._proc_mg.pid)
|
||||
self._proc_mg.terminate()
|
||||
ret = self._proc_mg.wait()
|
||||
self._proc_mg = None
|
||||
return ret, usage
|
||||
|
||||
def start_preparation(self):
|
||||
if self._memgraph_version >= (0, 50, 0):
|
||||
self._start(storage_snapshot_on_exit=True)
|
||||
else:
|
||||
self._start(snapshot_on_exit=True)
|
||||
|
||||
def start_benchmark(self):
|
||||
# TODO: support custom benchmarking config files!
|
||||
if self._memgraph_version >= (0, 50, 0):
|
||||
self._start(storage_recover_on_startup=True)
|
||||
else:
|
||||
self._start(db_recover_on_startup=True)
|
||||
|
||||
def stop(self):
|
||||
ret, usage = self._cleanup()
|
||||
assert ret == 0, "The database process exited with a non-zero " \
|
||||
"status ({})!".format(ret)
|
||||
return usage
|
||||
|
||||
|
||||
class Client:
|
||||
def __init__(self, client_binary, temporary_directory):
|
||||
self._client_binary = client_binary
|
||||
self._directory = tempfile.TemporaryDirectory(dir=temporary_directory)
|
||||
|
||||
def _get_args(self, **kwargs):
|
||||
return _convert_args_to_flags(self._client_binary, **kwargs)
|
||||
|
||||
def execute(self, queries=None, file_path=None, num_workers=1):
|
||||
if (queries is None and file_path is None) or \
|
||||
(queries is not None and file_path is not None):
|
||||
raise ValueError("Either queries or input_path must be specified!")
|
||||
|
||||
# TODO: check `file_path.endswith(".json")` to support advanced
|
||||
# input queries
|
||||
|
||||
queries_json = False
|
||||
if queries is not None:
|
||||
queries_json = True
|
||||
file_path = os.path.join(self._directory.name, "queries.json")
|
||||
with open(file_path, "w") as f:
|
||||
for query in queries:
|
||||
json.dump(query, f)
|
||||
f.write("\n")
|
||||
|
||||
args = self._get_args(input=file_path, num_workers=num_workers,
|
||||
queries_json=queries_json)
|
||||
ret = subprocess.run(args, stdout=subprocess.PIPE, check=True)
|
||||
data = ret.stdout.decode("utf-8").strip().split("\n")
|
||||
return list(map(json.loads, data))
|
Loading…
Reference in New Issue
Block a user