99494d77e3
Reviewers: mislav.bradac Reviewed By: mislav.bradac Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D826
234 lines
9.0 KiB
Python
Executable File
234 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
'''
|
|
Run the LDBC SNB interactive workload / benchmark.
|
|
The benchmark is executed with:
|
|
* ldbc_driver -> workload executor
|
|
* ldbc-snb-impls/snb-interactive-neo4j -> workload implementation
|
|
'''
|
|
|
|
import argparse
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
|
|
|
|
|
|
def wait_for_server(port, delay=1.0):
|
|
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
|
while subprocess.call(cmd) != 0:
|
|
time.sleep(0.5)
|
|
time.sleep(delay)
|
|
|
|
|
|
class Memgraph:
|
|
def __init__(self, dataset, port, num_workers):
|
|
self.proc = None
|
|
self.dataset = dataset
|
|
self.port = str(port)
|
|
self.num_workers = str(num_workers)
|
|
|
|
def start(self):
|
|
# find executable path
|
|
binary = os.path.join(BASE_DIR, "build", "memgraph")
|
|
if not os.path.exists(binary):
|
|
binary = os.path.join(BASE_DIR, "build_release", "memgraph")
|
|
|
|
# database args
|
|
database_args = [binary, "--num-workers", self.num_workers,
|
|
"--snapshot-directory", os.path.join(self.dataset,
|
|
"memgraph"),
|
|
"--snapshot-recover-on-startup", "true",
|
|
"--port", self.port]
|
|
|
|
# database env
|
|
env = {"MEMGRAPH_CONFIG": os.path.join(SCRIPT_DIR, "config",
|
|
"memgraph.conf")}
|
|
|
|
# start memgraph
|
|
self.proc = subprocess.Popen(database_args, env=env)
|
|
wait_for_server(self.port)
|
|
|
|
def stop(self):
|
|
self.proc.terminate()
|
|
if self.proc.wait() != 0:
|
|
raise Exception("Database exited with non-zero exit code!")
|
|
|
|
|
|
class Neo:
|
|
def __init__(self, dataset, port):
|
|
self.proc = None
|
|
self.dataset = dataset
|
|
self.port = str(port)
|
|
self.http_port = str(int(port) + 7474)
|
|
self.home_dir = None
|
|
|
|
def start(self):
|
|
# create home directory
|
|
self.home_dir = tempfile.mkdtemp(dir=self.dataset)
|
|
|
|
neo4j_dir = os.path.join(BASE_DIR, "libs", "neo4j")
|
|
|
|
try:
|
|
os.symlink(os.path.join(neo4j_dir, "lib"),
|
|
os.path.join(self.home_dir, "lib"))
|
|
shutil.copytree(os.path.join(self.dataset, "neo4j"),
|
|
os.path.join(self.home_dir, "data"))
|
|
conf_dir = os.path.join(self.home_dir, "conf")
|
|
conf_file = os.path.join(conf_dir, "neo4j.conf")
|
|
os.mkdir(conf_dir)
|
|
shutil.copyfile(os.path.join(SCRIPT_DIR, "config", "neo4j.conf"),
|
|
conf_file)
|
|
with open(conf_file, "a") as f:
|
|
f.write("\ndbms.connector.bolt.listen_address=:" +
|
|
self.port + "\n")
|
|
f.write("\ndbms.connector.http.listen_address=:" +
|
|
self.http_port + "\n")
|
|
|
|
# environment
|
|
env = {"NEO4J_HOME": self.home_dir}
|
|
|
|
self.proc = subprocess.Popen([os.path.join(neo4j_dir, "bin",
|
|
"neo4j"),
|
|
"console"], env=env, cwd=neo4j_dir)
|
|
except:
|
|
shutil.rmtree(self.home_dir)
|
|
raise Exception("Couldn't run Neo4j!")
|
|
|
|
wait_for_server(self.http_port, 2.0)
|
|
|
|
def stop(self):
|
|
self.proc.terminate()
|
|
ret = self.proc.wait()
|
|
if os.path.exists(self.home_dir):
|
|
shutil.rmtree(self.home_dir)
|
|
if ret != 0:
|
|
raise Exception("Database exited with non-zero exit code!")
|
|
|
|
|
|
def parse_args():
|
|
argp = argparse.ArgumentParser(
|
|
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
argp.add_argument('--scale', type=int, default=1,
|
|
help='Dataset scale to use for benchmarking.')
|
|
argp.add_argument('--host', default='127.0.0.1', help='Database host.')
|
|
argp.add_argument('--port', default='7687', help='Database port.')
|
|
argp.add_argument('--time-compression-ratio', type=float, default=0.001,
|
|
help='Compress/stretch durations between operation start '
|
|
'times to increase/decrease benchmark load. '
|
|
'E.g. 2.0 = run benchmark 2x slower, 0.1 = run '
|
|
'benchmark 10x faster. Default is 0.001.')
|
|
argp.add_argument('--operation-count', type=int, default=1000,
|
|
help='Number of operations to generate during benchmark '
|
|
'execution.')
|
|
argp.add_argument('--thread-count', type=int, default=8,
|
|
help='Thread pool size to use for executing operation '
|
|
'handlers.')
|
|
argp.add_argument('--time-unit', default='microseconds',
|
|
choices=('nanoseconds', 'microseconds', 'milliseconds',
|
|
'seconds', 'minutes'),
|
|
help='Time unit to use for measuring performance metrics')
|
|
argp.add_argument('--result-file-prefix', default='',
|
|
help='Result file name prefix')
|
|
argp.add_argument('--test-type', choices=('reads', 'updates'),
|
|
default='reads', help='Test queries of type')
|
|
argp.add_argument('--run-db', choices=('memgraph', 'neo4j'),
|
|
help='Run the database before starting LDBC')
|
|
argp.add_argument('--create-index', action='store_true', default=False,
|
|
help='Create index in the running database.')
|
|
return argp.parse_args()
|
|
|
|
|
|
LDBC_INTERACTIVE_NEO4J = \
|
|
os.path.join(SCRIPT_DIR,
|
|
'ldbc-snb-impls', 'snb-interactive-neo4j', 'target',
|
|
'snb-interactive-neo4j-1.0.0-jar-with-dependencies.jar')
|
|
LDBC_DEFAULT_PROPERTIES = \
|
|
os.path.join(SCRIPT_DIR, 'ldbc_driver', 'configuration',
|
|
'ldbc_driver_default.properties')
|
|
|
|
|
|
def create_index(port, database):
|
|
index_file = os.path.join(SCRIPT_DIR, 'ldbc-snb-impls',
|
|
'snb-interactive-neo4j', 'scripts', 'indexCreation.neo4j')
|
|
subprocess.check_call(('ve3/bin/python3', 'index_creation.py', '--port',
|
|
port, '--database', database, index_file),
|
|
cwd=SCRIPT_DIR)
|
|
time.sleep(1.0)
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
dataset = os.path.join(SCRIPT_DIR, "datasets", "scale_" + str(args.scale))
|
|
|
|
db = None
|
|
if args.run_db:
|
|
if args.host != "127.0.0.1":
|
|
raise Exception("Host parameter must point to localhost when "
|
|
"this script starts the database!")
|
|
if args.run_db.lower() == 'memgraph':
|
|
db = Memgraph(dataset, args.port, args.thread_count)
|
|
elif args.run_db.lower() == 'neo4j':
|
|
db = Neo(dataset, args.port)
|
|
|
|
try:
|
|
if db:
|
|
db.start()
|
|
time.sleep(10.0)
|
|
|
|
if args.create_index:
|
|
create_index(args.port, args.run_db.lower())
|
|
time.sleep(10.0)
|
|
|
|
# Run LDBC driver.
|
|
cp = 'target/jeeves-0.3-SNAPSHOT.jar:{}'.format(LDBC_INTERACTIVE_NEO4J)
|
|
updates_dir = os.path.join(dataset, 'social_network')
|
|
parameters_dir = os.path.join(dataset, 'substitution_parameters')
|
|
java_cmd = ('java', '-cp', cp, 'com.ldbc.driver.Client',
|
|
'-P', LDBC_DEFAULT_PROPERTIES,
|
|
'-P', os.path.join(SCRIPT_DIR, "ldbc-snb-impls-{}."
|
|
"properties".format(args.test_type)),
|
|
'-p', 'ldbc.snb.interactive.updates_dir', updates_dir,
|
|
'-p', 'host', args.host, '-p', 'port', args.port,
|
|
'-db', 'net.ellitron.ldbcsnbimpls.interactive.neo4j.Neo4jDb',
|
|
'-p', 'ldbc.snb.interactive.parameters_dir', parameters_dir,
|
|
'--time_compression_ratio', str(args.time_compression_ratio),
|
|
'--operation_count', str(args.operation_count),
|
|
'--thread_count', str(args.thread_count),
|
|
'--time_unit', args.time_unit.upper())
|
|
subprocess.check_call(java_cmd, cwd=os.path.join(SCRIPT_DIR, 'ldbc_driver'))
|
|
|
|
# Copy the results to results dir.
|
|
ldbc_results = os.path.join(SCRIPT_DIR, 'ldbc_driver', 'results',
|
|
'LDBC-results.json')
|
|
results_dir = os.path.join(SCRIPT_DIR, 'results')
|
|
results_name = []
|
|
if args.result_file_prefix:
|
|
results_name.append(args.result_file_prefix)
|
|
if args.run_db:
|
|
results_name.append(args.run_db)
|
|
else:
|
|
results_name.append("external")
|
|
results_name.append("scale_" + str(args.scale))
|
|
results_name = "-".join(results_name + ["LDBC", "results.json"])
|
|
results_copy = os.path.join(results_dir, results_name)
|
|
shutil.copyfile(ldbc_results, results_copy)
|
|
|
|
print("Results saved to:", results_copy)
|
|
|
|
finally:
|
|
if db:
|
|
db.stop()
|
|
|
|
print("Done!")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|