Refactor harness
Reviewers: mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D732
This commit is contained in:
parent
24b52270e4
commit
bba5d134c0
@ -93,6 +93,11 @@ class Client {
|
||||
}
|
||||
}
|
||||
|
||||
Client(const Client &) = delete;
|
||||
Client(Client &&) = delete;
|
||||
Client &operator=(const Client &) = delete;
|
||||
Client &operator=(Client &&) = delete;
|
||||
|
||||
QueryData Execute(const std::string &query,
|
||||
const std::map<std::string, DecodedValue> ¶meters) {
|
||||
DLOG(INFO) << "Sending run message with statement: '" << query
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
namespace utils {
|
||||
|
||||
// This class is threadsafe.
|
||||
class Timer {
|
||||
public:
|
||||
/** Time elapsed since creation. */
|
||||
@ -12,7 +13,7 @@ class Timer {
|
||||
}
|
||||
|
||||
private:
|
||||
std::chrono::time_point<std::chrono::steady_clock> start_time_ =
|
||||
const std::chrono::time_point<std::chrono::steady_clock> start_time_ =
|
||||
std::chrono::steady_clock::now();
|
||||
};
|
||||
};
|
||||
|
63
tests/macro_benchmark/harness/common.hpp
Normal file
63
tests/macro_benchmark/harness/common.hpp
Normal file
@ -0,0 +1,63 @@
|
||||
#include <string>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
|
||||
namespace {
|
||||
|
||||
void PrintJsonDecodedValue(std::ostream &os,
|
||||
const communication::bolt::DecodedValue &value) {
|
||||
using communication::bolt::DecodedValue;
|
||||
switch (value.type()) {
|
||||
case DecodedValue::Type::Null:
|
||||
os << "null";
|
||||
break;
|
||||
case DecodedValue::Type::Bool:
|
||||
os << (value.ValueBool() ? "true" : "false");
|
||||
break;
|
||||
case DecodedValue::Type::Int:
|
||||
os << value.ValueInt();
|
||||
break;
|
||||
case DecodedValue::Type::Double:
|
||||
os << value.ValueDouble();
|
||||
break;
|
||||
case DecodedValue::Type::String:
|
||||
os << "\"" << value.ValueString() << "\"";
|
||||
break;
|
||||
case DecodedValue::Type::List:
|
||||
os << "[";
|
||||
PrintIterable(os, value.ValueList(), ", ",
|
||||
[](auto &stream, const auto &item) {
|
||||
PrintJsonDecodedValue(stream, item);
|
||||
});
|
||||
os << "]";
|
||||
break;
|
||||
case DecodedValue::Type::Map:
|
||||
os << "{";
|
||||
PrintIterable(os, value.ValueMap(), ", ",
|
||||
[](auto &stream, const auto &pair) {
|
||||
PrintJsonDecodedValue(stream, {pair.first});
|
||||
stream << ": ";
|
||||
PrintJsonDecodedValue(stream, pair.second);
|
||||
});
|
||||
os << "}";
|
||||
break;
|
||||
default:
|
||||
std::terminate();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename SocketT>
|
||||
communication::bolt::QueryData ExecuteNTimesTillSuccess(
|
||||
communication::bolt::Client<SocketT> &client, const std::string &query,
|
||||
int times) {
|
||||
for (int i = 0; i < times; ++i) {
|
||||
try {
|
||||
auto ret = client.Execute(query, {});
|
||||
return ret;
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
}
|
||||
}
|
||||
throw communication::bolt::ClientQueryException();
|
||||
}
|
||||
}
|
@ -29,6 +29,96 @@ from perf import Perf
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def load_scenarios(args):
|
||||
"""
|
||||
Scans through folder structure starting with groups_root and
|
||||
loads query scenarios.
|
||||
Expected folder structure is:
|
||||
groups_root/
|
||||
groupname1/
|
||||
config.json
|
||||
common.py
|
||||
setup.FILE_TYPE
|
||||
teardown.FILE_TYPE
|
||||
itersetup.FILE_TYPE
|
||||
iterteardown.FILE_TYPE
|
||||
scenario1.config.json
|
||||
scenario1.run.FILE_TYPE-------(mandatory)
|
||||
scenario1.setup.FILE_TYPE
|
||||
scenario1.teardown.FILE_TYPE
|
||||
scenario1.itersetup.FILE_TYPE
|
||||
scenario1.iterteardown.FILE_TYPE
|
||||
scenario2...
|
||||
...
|
||||
groupname2/
|
||||
...
|
||||
|
||||
Per query configs (setup, teardown, itersetup, iterteardown)
|
||||
override group configs for that scenario. Group configs must have one
|
||||
extension (.FILE_TYPE) and
|
||||
scenario configs must have 2 extensions (.scenario_name.FILE_TYPE).
|
||||
Each suite doesn't need to implement all query steps and filetypes.
|
||||
See documentation in each suite for supported ones.
|
||||
|
||||
Args:
|
||||
args: additional args parsed by this function
|
||||
group_paths: str, root folder that contains group folders
|
||||
Return:
|
||||
{group: (scenario, {config: query_generator_function})
|
||||
"""
|
||||
argp = ArgumentParser("QuerySuite.scenarios argument parser")
|
||||
argp.add_argument("--query-scenarios-root", default=path.join(
|
||||
DIR_PATH, "groups"),
|
||||
dest="root")
|
||||
args, _ = argp.parse_known_args()
|
||||
log.info("Loading query scenarios from root: %s", args.root)
|
||||
|
||||
def fill_config_dict(config_dict, base, config_files):
|
||||
for config_file in config_files:
|
||||
log.debug("Processing config file %s", config_file)
|
||||
config_name = config_file.split(".")[-2]
|
||||
config_dict[config_name] = QuerySuite.Loader(
|
||||
path.join(base, config_file))
|
||||
|
||||
# validate that the scenario does not contain any illegal
|
||||
# keys (defense against typos in file naming)
|
||||
unknown_keys = set(config_dict) - QuerySuite.KNOWN_KEYS
|
||||
if unknown_keys:
|
||||
raise Exception("Unknown QuerySuite config elements: '%r'" %
|
||||
unknown_keys)
|
||||
|
||||
def dir_content(root, predicate):
|
||||
return [p for p in os.listdir(root)
|
||||
if predicate(path.join(root, p))]
|
||||
|
||||
group_scenarios = OrderedDict()
|
||||
for group in dir_content(args.root, path.isdir):
|
||||
log.info("Loading group: '%s'", group)
|
||||
|
||||
group_scenarios[group] = []
|
||||
files = dir_content(path.join(args.root, group),
|
||||
path.isfile)
|
||||
|
||||
# process group default config
|
||||
group_config = {}
|
||||
fill_config_dict(group_config, path.join(args.root, group),
|
||||
[f for f in files if f.count(".") == 1])
|
||||
|
||||
# group files on scenario
|
||||
for scenario_name, scenario_files in itertools.groupby(
|
||||
filter(lambda f: f.count(".") == 2, sorted(files)),
|
||||
lambda x: x.split(".")[0]):
|
||||
log.info("Loading scenario: '%s'", scenario_name)
|
||||
scenario = dict(group_config)
|
||||
fill_config_dict(scenario,
|
||||
path.join(args.root, group),
|
||||
scenario_files)
|
||||
group_scenarios[group].append((scenario_name, scenario))
|
||||
log.debug("Loaded config for scenario '%s'\n%r", scenario_name,
|
||||
scenario)
|
||||
|
||||
return group_scenarios
|
||||
|
||||
|
||||
class _QuerySuite:
|
||||
"""
|
||||
@ -99,96 +189,6 @@ class _QuerySuite:
|
||||
def __repr__(self):
|
||||
return "(QuerySuite.Loader<%s>)" % self.file_path
|
||||
|
||||
@staticmethod
|
||||
def scenarios(args):
|
||||
"""
|
||||
Scans through folder structure starting with groups_root and
|
||||
loads query scenarios.
|
||||
Expected folder structure is:
|
||||
groups_root/
|
||||
groupname1/
|
||||
config.json
|
||||
common.py
|
||||
setup.FILE_TYPE
|
||||
teardown.FILE_TYPE
|
||||
itersetup.FILE_TYPE
|
||||
iterteardown.FILE_TYPE
|
||||
scenario1.config.json
|
||||
scenario1.run.FILE_TYPE-------(mandatory)
|
||||
scenario1.setup.FILE_TYPE
|
||||
scenario1.teardown.FILE_TYPE
|
||||
scenario1.itersetup.FILE_TYPE
|
||||
scenario1.iterteardown.FILE_TYPE
|
||||
scenario2...
|
||||
...
|
||||
groupname2/
|
||||
...
|
||||
|
||||
Per query configs (setup, teardown, itersetup, iterteardown)
|
||||
override group configs for that scenario. Group configs must have one
|
||||
extension (.FILE_TYPE) and
|
||||
scenario configs must have 2 extensions (.scenario_name.FILE_TYPE).
|
||||
See `QueryLoader` documentation to see which file types are supported.
|
||||
|
||||
Args:
|
||||
args: additional args parsed by this function
|
||||
group_paths: str, root folder that contains group folders
|
||||
Return:
|
||||
{group: (scenario, {config: query_generator_function})
|
||||
"""
|
||||
argp = ArgumentParser("QuerySuite.scenarios argument parser")
|
||||
argp.add_argument("--query-scenarios-root", default=path.join(
|
||||
DIR_PATH, "groups"),
|
||||
dest="root")
|
||||
args, _ = argp.parse_known_args()
|
||||
log.info("Loading query scenarios from root: %s", args.root)
|
||||
|
||||
def fill_config_dict(config_dict, base, config_files):
|
||||
for config_file in config_files:
|
||||
log.debug("Processing config file %s", config_file)
|
||||
config_name = config_file.split(".")[-2]
|
||||
config_dict[config_name] = QuerySuite.Loader(
|
||||
path.join(base, config_file))
|
||||
|
||||
# validate that the scenario does not contain any illegal
|
||||
# keys (defense against typos in file naming)
|
||||
unknown_keys = set(config_dict) - QuerySuite.KNOWN_KEYS
|
||||
if unknown_keys:
|
||||
raise Exception("Unknown QuerySuite config elements: '%r'" %
|
||||
unknown_keys)
|
||||
|
||||
def dir_content(root, predicate):
|
||||
return [p for p in os.listdir(root)
|
||||
if predicate(path.join(root, p))]
|
||||
|
||||
group_scenarios = OrderedDict()
|
||||
for group in dir_content(args.root, path.isdir):
|
||||
log.info("Loading group: '%s'", group)
|
||||
|
||||
group_scenarios[group] = []
|
||||
files = dir_content(path.join(args.root, group),
|
||||
path.isfile)
|
||||
|
||||
# process group default config
|
||||
group_config = {}
|
||||
fill_config_dict(group_config, path.join(args.root, group),
|
||||
[f for f in files if f.count(".") == 1])
|
||||
|
||||
# group files on scenario
|
||||
for scenario_name, scenario_files in itertools.groupby(
|
||||
filter(lambda f: f.count(".") == 2, sorted(files)),
|
||||
lambda x: x.split(".")[0]):
|
||||
log.info("Loading scenario: '%s'", scenario_name)
|
||||
scenario = dict(group_config)
|
||||
fill_config_dict(scenario,
|
||||
path.join(args.root, group),
|
||||
scenario_files)
|
||||
group_scenarios[group].append((scenario_name, scenario))
|
||||
log.debug("Loaded config for scenario '%s'\n%r", scenario_name,
|
||||
scenario)
|
||||
|
||||
return group_scenarios
|
||||
|
||||
def run(self, scenario, group_name, scenario_name, runner):
|
||||
log.debug("QuerySuite.run() with scenario: %s", scenario)
|
||||
scenario_config = scenario.get("config")
|
||||
@ -305,7 +305,95 @@ class QueryParallelSuite(_QuerySuite):
|
||||
return ["aggregation_parallel", "create_parallel"]
|
||||
|
||||
|
||||
class _BaseRunner:
|
||||
def get_common_runner_argument_parser():
|
||||
argp = ArgumentParser("CommonRunnerArgumentParser")
|
||||
argp.add_argument("--address", help="Database and client address",
|
||||
default="127.0.0.1")
|
||||
argp.add_argument("--port", help="Database and client port",
|
||||
default="7687")
|
||||
return argp
|
||||
|
||||
|
||||
# Database wrappers.
|
||||
|
||||
class Memgraph:
|
||||
"""
|
||||
Knows how to start and stop memgraph.
|
||||
"""
|
||||
def __init__(self, args, cpus=None):
|
||||
if cpus is None: cpus = [1]
|
||||
self.log = logging.getLogger("MemgraphRunner")
|
||||
argp = ArgumentParser("MemgraphArgumentParser", add_help=False,
|
||||
parents=[get_common_runner_argument_parser()])
|
||||
argp.add_argument("--RunnerBin",
|
||||
default=os.path.join(DIR_PATH,
|
||||
"../../../build/memgraph"))
|
||||
argp.add_argument("--RunnerConfig",
|
||||
default=os.path.normpath(os.path.join(
|
||||
DIR_PATH,
|
||||
"../../../config/benchmarking_latency.conf")))
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
self.database_bin = jail.get_process()
|
||||
self.database_bin.set_cpus(cpus)
|
||||
|
||||
def start(self):
|
||||
self.log.info("start")
|
||||
environment = os.environ.copy()
|
||||
environment["MEMGRAPH_CONFIG"] = self.args.RunnerConfig
|
||||
database_args = ["--interface", self.args.address,
|
||||
"--port", self.args.port]
|
||||
self.database_bin.run(self.args.RunnerBin, database_args,
|
||||
env=environment, timeout=600)
|
||||
# TODO change to a check via SIGUSR
|
||||
time.sleep(1.0)
|
||||
return self.database_bin.get_pid() if not APOLLO else None
|
||||
|
||||
def stop(self):
|
||||
self.database_bin.send_signal(jail.SIGTERM)
|
||||
self.database_bin.wait()
|
||||
|
||||
|
||||
class Neo:
|
||||
def __init__(self, args, cpus):
|
||||
if cpus is None: cpus = [1]
|
||||
self.log = logging.getLogger("NeoRunner")
|
||||
argp = ArgumentParser("NeoArgumentParser", add_help=False,
|
||||
parents=[get_common_runner_argument_parser()])
|
||||
argp.add_argument(
|
||||
"--RunnerConfigDir",
|
||||
default=path.join(DIR_PATH, "neo4j_config"))
|
||||
argp.add_argument(
|
||||
"--RunnerHomeDir",
|
||||
default=path.join(DIR_PATH, "neo4j_home"))
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
if self.args.address != "127.0.0.1" or self.args.port != "7687":
|
||||
raise Exception(
|
||||
"Neo wrapper doesn't support different address or port")
|
||||
self.database_bin = jail.get_process()
|
||||
self.database_bin.set_cpus(cpus)
|
||||
|
||||
def start(self):
|
||||
self.log.info("start")
|
||||
environment = os.environ.copy()
|
||||
environment["NEO4J_CONF"] = self.args.RunnerConfigDir
|
||||
environment["NEO4J_HOME"] = self.args.RunnerHomeDir
|
||||
neo4j_data_path = path.join(environment["NEO4J_HOME"], "data")
|
||||
if path.exists(neo4j_data_path):
|
||||
shutil.rmtree(neo4j_data_path)
|
||||
self.database_bin.run("/usr/share/neo4j/bin/neo4j", args=["console"],
|
||||
env=environment, timeout=600)
|
||||
# TODO change to a check via SIGUSR
|
||||
time.sleep(5.0)
|
||||
return self.database_bin.get_pid() if not APOLLO else None
|
||||
|
||||
def stop(self):
|
||||
self.database_bin.send_signal(jail.SIGTERM)
|
||||
self.database_bin.wait()
|
||||
|
||||
|
||||
class _HarnessClientRunner:
|
||||
"""
|
||||
Knows how to start and stop database (backend) some client frontend (bolt),
|
||||
and execute a cypher query.
|
||||
@ -314,19 +402,18 @@ class _BaseRunner:
|
||||
Inherited class should implement start method and initialise database_bin
|
||||
and bolt_client members of type Process.
|
||||
"""
|
||||
def __init__(self, args):
|
||||
self.log = logging.getLogger("_BaseRunner")
|
||||
|
||||
def _get_argparser(self):
|
||||
argp = ArgumentParser("RunnerArgumentParser")
|
||||
# TODO: This option should be passed to the database and client, not
|
||||
# only to the client as we are doing at the moment.
|
||||
argp.add_argument("--RunnerUri", default="127.0.0.1:7687")
|
||||
return argp
|
||||
def __init__(self, args, database, cpus=None):
|
||||
if cpus is None: cpus = [2, 3]
|
||||
self.log = logging.getLogger("_HarnessClientRunner")
|
||||
self.database = database
|
||||
argp = ArgumentParser("RunnerArgumentParser", add_help=False,
|
||||
parents=[get_common_runner_argument_parser()])
|
||||
self.args, _ = argp.parse_known_args()
|
||||
self.bolt_client = jail.get_process()
|
||||
self.bolt_client.set_cpus(cpus)
|
||||
|
||||
def start(self):
|
||||
raise NotImplementedError(
|
||||
"This method should be implemented in derivded class")
|
||||
self.database.start()
|
||||
|
||||
def execute(self, queries, num_client_workers):
|
||||
self.log.debug("execute('%s')", str(queries))
|
||||
@ -354,16 +441,15 @@ class _BaseRunner:
|
||||
output_fd, output = tempfile.mkstemp()
|
||||
os.close(output_fd)
|
||||
|
||||
address, port = self.args.RunnerUri.split(":")
|
||||
client_args = ["--address", address, "--port", port,
|
||||
client_args = ["--address", self.args.address, "--port", self.args.port,
|
||||
"--num-workers", str(num_client_workers),
|
||||
"--output", output]
|
||||
|
||||
cpu_time_start = self.database_bin.get_usage()["cpu"]
|
||||
cpu_time_start = self.database.database_bin.get_usage()["cpu"]
|
||||
# TODO make the timeout configurable per query or something
|
||||
return_code = self.bolt_client.run_and_wait(
|
||||
client, client_args, timeout=600, stdin=queries_path)
|
||||
cpu_time_end = self.database_bin.get_usage()["cpu"]
|
||||
cpu_time_end = self.database.database_bin.get_usage()["cpu"]
|
||||
os.remove(queries_path)
|
||||
if return_code != 0:
|
||||
with open(self.bolt_client.get_stderr()) as f:
|
||||
@ -378,88 +464,27 @@ class _BaseRunner:
|
||||
data[CPU_TIME] = cpu_time_end - cpu_time_start
|
||||
|
||||
os.remove(output)
|
||||
|
||||
return data
|
||||
|
||||
def stop(self):
|
||||
self.log.info("stop")
|
||||
self.bolt_client.wait()
|
||||
self.database_bin.send_signal(jail.SIGTERM)
|
||||
self.database_bin.wait()
|
||||
self.database.stop()
|
||||
|
||||
|
||||
class MemgraphRunner(_BaseRunner):
|
||||
"""
|
||||
Knows how to start and stop Memgraph (backend) some client frontent
|
||||
(bolt), and execute a cypher query.
|
||||
Execution returns benchmarking data (execution times, memory
|
||||
usage etc).
|
||||
"""
|
||||
def __init__(self, args):
|
||||
super(MemgraphRunner, self).__init__(args)
|
||||
self.log = logging.getLogger("MemgraphRunner")
|
||||
argp = self._get_argparser()
|
||||
argp.add_argument("--RunnerBin",
|
||||
default=os.path.join(DIR_PATH,
|
||||
"../../../build/memgraph"))
|
||||
argp.add_argument("--RunnerConfig",
|
||||
default=os.path.normpath(os.path.join(
|
||||
DIR_PATH,
|
||||
"../../../config/benchmarking_latency.conf")))
|
||||
# parse args
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
self.database_bin = jail.get_process()
|
||||
self.database_bin.set_cpus([1])
|
||||
self.bolt_client = jail.get_process()
|
||||
self.bolt_client.set_cpus([2, 3])
|
||||
|
||||
def start(self):
|
||||
self.log.info("start")
|
||||
environment = os.environ.copy()
|
||||
environment["MEMGRAPH_CONFIG"] = self.args.RunnerConfig
|
||||
self.database_bin.run(self.args.RunnerBin, env=environment,
|
||||
timeout=600)
|
||||
# TODO change to a check via SIGUSR
|
||||
time.sleep(1.0)
|
||||
return self.database_bin.get_pid() if not APOLLO else None
|
||||
class MemgraphRunner(_HarnessClientRunner):
|
||||
def __init__(self, args, client_cpus=None, database_cpus=None):
|
||||
database = Memgraph(args, database_cpus)
|
||||
super(MemgraphRunner, self).__init__(args, database, cpus=client_cpus)
|
||||
|
||||
|
||||
class NeoRunner(_BaseRunner):
|
||||
def __init__(self, args):
|
||||
super(NeoRunner, self).__init__(args)
|
||||
self.log = logging.getLogger("NeoRunner")
|
||||
argp = self._get_argparser()
|
||||
argp.add_argument(
|
||||
"--RunnerConfigDir",
|
||||
default=path.join(DIR_PATH, "neo4j_config"))
|
||||
argp.add_argument(
|
||||
"--RunnerHomeDir",
|
||||
default=path.join(DIR_PATH, "neo4j_home"))
|
||||
# parse args
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
self.database_bin = jail.get_process()
|
||||
self.database_bin.set_cpus([1])
|
||||
self.bolt_client = jail.get_process()
|
||||
self.bolt_client.set_cpus([2, 3])
|
||||
|
||||
def start(self):
|
||||
self.log.info("start")
|
||||
environment = os.environ.copy()
|
||||
environment["NEO4J_CONF"] = self.args.RunnerConfigDir
|
||||
environment["NEO4J_HOME"] = self.args.RunnerHomeDir
|
||||
neo4j_data_path = path.join(environment["NEO4J_HOME"], "data")
|
||||
if path.exists(neo4j_data_path):
|
||||
shutil.rmtree(neo4j_data_path)
|
||||
self.database_bin.run("/usr/share/neo4j/bin/neo4j", args=["console"],
|
||||
env=environment, timeout=600)
|
||||
# TODO change to a check via SIGUSR
|
||||
time.sleep(5.0)
|
||||
return self.database_bin.get_pid() if not APOLLO else None
|
||||
class NeoRunner(_HarnessClientRunner):
|
||||
def __init__(self, args, client_cpus=None, database_cpus=None):
|
||||
database = Neo(args, database_cpus)
|
||||
super(NeoRunner, self).__init__(args, database, cpus=client_cpus)
|
||||
|
||||
|
||||
def parse_known_args():
|
||||
def main():
|
||||
argp = ArgumentParser(description=__doc__)
|
||||
# positional, mandatory args
|
||||
argp.add_argument("suite", help="Suite to run.")
|
||||
@ -476,11 +501,8 @@ def parse_known_args():
|
||||
argp.add_argument("--no-strict", default=False, action="store_true",
|
||||
help="Ignores nonexisting groups instead of raising an "
|
||||
"exception")
|
||||
return argp.parse_known_args()
|
||||
args, remaining_args = argp.parse_known_args()
|
||||
|
||||
|
||||
def main():
|
||||
args, remaining_args = parse_known_args()
|
||||
if args.logging:
|
||||
logging.basicConfig(level=args.logging)
|
||||
logging.getLogger("requests").setLevel(logging.WARNING)
|
||||
@ -489,7 +511,7 @@ def main():
|
||||
log.info("Memgraph benchmark suite harness")
|
||||
log.info("Executing for suite '%s', runner '%s'", args.suite, args.runner)
|
||||
|
||||
# Create suite
|
||||
# Create suites.
|
||||
suites = {"QuerySuite": QuerySuite,
|
||||
"QueryParallelSuite": QueryParallelSuite}
|
||||
if args.suite not in suites:
|
||||
@ -498,21 +520,20 @@ def main():
|
||||
args.suite, suites))
|
||||
suite = suites[args.suite](remaining_args)
|
||||
|
||||
# Load scenarios
|
||||
group_scenarios = suites[args.suite].scenarios(remaining_args)
|
||||
# Load scenarios.
|
||||
group_scenarios = load_scenarios(remaining_args)
|
||||
log.info("Loaded %d groups, with a total of %d scenarios",
|
||||
len(group_scenarios),
|
||||
sum([len(x) for x in group_scenarios.values()]))
|
||||
|
||||
# Create runner
|
||||
# Create runners.
|
||||
runners = {"MemgraphRunner": MemgraphRunner, "NeoRunner": NeoRunner}
|
||||
# TODO if make runner argument optional, then execute all runners
|
||||
if args.runner not in suite.runners():
|
||||
raise Exception("Runner '{}' not registered for suite '{}'".format(
|
||||
args.runner, args.suite))
|
||||
runner = runners[args.runner](remaining_args)
|
||||
|
||||
# Validate groups (if provided)
|
||||
# Validate groups (if provided).
|
||||
groups = []
|
||||
if args.groups:
|
||||
for group in args.groups:
|
||||
@ -529,6 +550,7 @@ def main():
|
||||
# No groups provided, use all suite group
|
||||
groups = suite.groups()
|
||||
|
||||
# Filter scenarios.
|
||||
# TODO enable scenario filtering on regex
|
||||
filtered_scenarios = OrderedDict()
|
||||
for group, scenarios in group_scenarios.items():
|
||||
@ -544,6 +566,7 @@ def main():
|
||||
log.info("No scenarios to execute")
|
||||
return
|
||||
|
||||
# Run scenarios.
|
||||
log.info("Executing %d scenarios", len(filtered_scenarios))
|
||||
results = []
|
||||
for (group, scenario_name), scenario in sorted(filtered_scenarios.items()):
|
||||
@ -553,6 +576,8 @@ def main():
|
||||
iter_result["group"] = group
|
||||
iter_result["scenario"] = scenario_name
|
||||
results.append(iter_result)
|
||||
|
||||
# Save results.
|
||||
run = dict()
|
||||
run["suite"] = args.suite
|
||||
run["runner"] = runner.__class__.__name__
|
||||
@ -560,6 +585,8 @@ def main():
|
||||
run.update(args.additional_run_fields)
|
||||
for result in results:
|
||||
jail.store_data(result)
|
||||
|
||||
# Print summary.
|
||||
print("\n\nMacro benchmark summary:")
|
||||
print("{}\n".format(suite.summary))
|
||||
with open(os.path.join(DIR_PATH, ".harness_summary"), "w") as f:
|
||||
|
@ -4,7 +4,9 @@
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "common.hpp"
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "io/network/network_endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
@ -14,7 +16,7 @@
|
||||
using SocketT = io::network::Socket;
|
||||
using EndpointT = io::network::NetworkEndpoint;
|
||||
using ClientT = communication::bolt::Client<SocketT>;
|
||||
using DecodedValueT = communication::bolt::DecodedValue;
|
||||
using communication::bolt::DecodedValue;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_string(port, "7687", "Server port");
|
||||
@ -25,49 +27,9 @@ DEFINE_string(password, "", "Password for the database");
|
||||
|
||||
const int MAX_RETRIES = 1000;
|
||||
|
||||
void PrintJsonDecodedValue(std::ostream &os, const DecodedValueT &value) {
|
||||
switch (value.type()) {
|
||||
case DecodedValueT::Type::Null:
|
||||
os << "null";
|
||||
break;
|
||||
case DecodedValueT::Type::Bool:
|
||||
os << (value.ValueBool() ? "true" : "false");
|
||||
break;
|
||||
case DecodedValueT::Type::Int:
|
||||
os << value.ValueInt();
|
||||
break;
|
||||
case DecodedValueT::Type::Double:
|
||||
os << value.ValueDouble();
|
||||
break;
|
||||
case DecodedValueT::Type::String:
|
||||
os << "\"" << value.ValueString() << "\"";
|
||||
break;
|
||||
case DecodedValueT::Type::List:
|
||||
os << "[";
|
||||
PrintIterable(os, value.ValueList(), ", ",
|
||||
[](auto &stream, const auto &item) {
|
||||
PrintJsonDecodedValue(stream, item);
|
||||
});
|
||||
os << "]";
|
||||
break;
|
||||
case DecodedValueT::Type::Map:
|
||||
os << "{";
|
||||
PrintIterable(os, value.ValueMap(), ", ",
|
||||
[](auto &stream, const auto &pair) {
|
||||
PrintJsonDecodedValue(stream, {pair.first});
|
||||
stream << ": ";
|
||||
PrintJsonDecodedValue(stream, pair.second);
|
||||
});
|
||||
os << "}";
|
||||
break;
|
||||
default:
|
||||
std::terminate();
|
||||
}
|
||||
}
|
||||
|
||||
void PrintJsonMetadata(
|
||||
std::ostream &os,
|
||||
const std::vector<std::map<std::string, DecodedValueT>> &metadata) {
|
||||
const std::vector<std::map<std::string, DecodedValue>> &metadata) {
|
||||
os << "[";
|
||||
PrintIterable(os, metadata, ", ", [](auto &stream, const auto &item) {
|
||||
PrintJsonDecodedValue(stream, item);
|
||||
@ -77,7 +39,7 @@ void PrintJsonMetadata(
|
||||
|
||||
void PrintSummary(
|
||||
std::ostream &os, double duration,
|
||||
const std::vector<std::map<std::string, DecodedValueT>> &metadata) {
|
||||
const std::vector<std::map<std::string, DecodedValue>> &metadata) {
|
||||
os << "{\"wall_time\": " << duration << ", "
|
||||
<< "\"metadatas\": ";
|
||||
PrintJsonMetadata(os, metadata);
|
||||
@ -94,7 +56,7 @@ int main(int argc, char **argv) {
|
||||
SpinLock spinlock;
|
||||
uint64_t last = 0;
|
||||
std::vector<std::string> queries;
|
||||
std::vector<std::map<std::string, DecodedValueT>> metadata;
|
||||
std::vector<std::map<std::string, DecodedValue>> metadata;
|
||||
|
||||
while (std::getline(std::cin, query)) {
|
||||
queries.push_back(query);
|
||||
@ -111,10 +73,12 @@ int main(int argc, char **argv) {
|
||||
try {
|
||||
endpoint = EndpointT(FLAGS_address, FLAGS_port);
|
||||
} catch (const io::network::NetworkEndpointException &e) {
|
||||
std::terminate();
|
||||
LOG(FATAL) << "Invalid address or port: " << FLAGS_address << ":"
|
||||
<< FLAGS_port;
|
||||
}
|
||||
if (!socket.Connect(endpoint)) {
|
||||
std::terminate();
|
||||
LOG(FATAL) << "Could not connect to: " << FLAGS_address << ":"
|
||||
<< FLAGS_port;
|
||||
}
|
||||
|
||||
ClientT client(std::move(socket), FLAGS_username, FLAGS_password);
|
||||
@ -130,17 +94,12 @@ int main(int argc, char **argv) {
|
||||
pos = last++;
|
||||
str = queries[pos];
|
||||
}
|
||||
int i;
|
||||
for (i = 0; i < MAX_RETRIES; ++i) {
|
||||
try {
|
||||
auto ret = client.Execute(str, {});
|
||||
metadata[pos] = ret.metadata;
|
||||
break;
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
}
|
||||
}
|
||||
if (i == MAX_RETRIES) {
|
||||
std::terminate();
|
||||
try {
|
||||
metadata[pos] =
|
||||
ExecuteNTimesTillSuccess(client, str, MAX_RETRIES).metadata;
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(FATAL) << "Could not execute query '" << str << "' "
|
||||
<< MAX_RETRIES << "times";
|
||||
}
|
||||
}
|
||||
client.Close();
|
||||
|
@ -36,7 +36,9 @@ class Process:
|
||||
self._usage = {}
|
||||
self._files = []
|
||||
|
||||
def run(self, binary, args = [], env = {}, timeout = 120, stdin = "/dev/null", cwd = "."):
|
||||
def run(self, binary, args = None, env = None, timeout = 120, stdin = "/dev/null", cwd = "."):
|
||||
if args is None: args = []
|
||||
if env is None: env = {}
|
||||
# don't start a new process if one is already running
|
||||
if self._proc != None and self._proc.returncode == None:
|
||||
raise ProcessException
|
||||
|
Loading…
Reference in New Issue
Block a user