replaced all occurences of wait_for_server for wait_for_port from gqlalchemy

This commit is contained in:
Boris Tasevski 2022-04-28 16:02:51 +02:00
parent 4abaf27765
commit ed4c4e6823
12 changed files with 155 additions and 206 deletions

View File

@ -12,30 +12,18 @@
import copy
import os
import subprocess
import sys
import tempfile
import time
import mgclient
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(PROJECT_DIR, "build")
MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph")
def wait_for_server(port, delay=0.01):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
count = 0
while subprocess.call(cmd) != 0:
time.sleep(0.01)
if count > 10 / 0.01:
print("Could not wait for server on port", port, "to startup!")
sys.exit(1)
count += 1
time.sleep(delay)
def extract_bolt_port(args):
for arg_index, arg in enumerate(args):
if arg.startswith("--bolt-port="):
@ -88,8 +76,9 @@ class MemgraphInstanceRunner:
] + self.args
self.bolt_port = extract_bolt_port(args_mg)
self.proc_mg = subprocess.Popen(args_mg)
wait_for_server(self.bolt_port)
self.conn = mgclient.connect(host=self.host, port=self.bolt_port, sslmode=self.ssl)
wait_for_port(port=self.bolt_port)
self.conn = mgclient.connect(
host=self.host, port=self.bolt_port, sslmode=self.ssl)
self.conn.autocommit = True
assert self.is_running(), "The Memgraph process died!"

View File

@ -20,9 +20,10 @@ import sys
import json
import subprocess
import tempfile
import time
import yaml
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
TESTS_DIR = os.path.join(SCRIPT_DIR, "tests")
@ -30,18 +31,6 @@ BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
def wait_for_server(port, delay=0.01):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
count = 0
while subprocess.call(cmd) != 0:
time.sleep(0.01)
if count > 20 / 0.01:
print("Could not wait for server on port", port, "to startup!")
sys.exit(1)
count += 1
time.sleep(delay)
def generate_result_csv(suite, result_path):
if not os.path.exists(result_path):
return ""
@ -105,7 +94,7 @@ class MemgraphRunner():
args_mg = [memgraph_binary, "--storage-properties-on-edges",
"--data-directory", self.data_directory.name]
self.proc_mg = subprocess.Popen(args_mg + self.args)
wait_for_server(7687, 1)
wait_for_port(port=7687, delay=1)
assert self.is_running(), "The Memgraph process died!"
def is_running(self):

View File

@ -21,6 +21,8 @@ import sys
import tempfile
import time
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
@ -58,13 +60,6 @@ QUERIES = [
]
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def execute_test(memgraph_binary, tester_binary):
storage_directory = tempfile.TemporaryDirectory()
memgraph_args = [
@ -80,7 +75,7 @@ def execute_test(memgraph_binary, tester_binary):
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
wait_for_port(port=7687)
# Register cleanup function
@atexit.register

View File

@ -19,6 +19,8 @@ import sys
import tempfile
import time
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
@ -159,13 +161,6 @@ UNAUTHORIZED_ERROR = "You are not authorized to execute this query! Please " \
"contact your database administrator."
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def execute_tester(binary, queries, should_fail=False, failure_message="",
username="", password="", check_failure=True):
args = [binary, "--username", username, "--password", password]
@ -217,7 +212,7 @@ def execute_test(memgraph_binary, tester_binary, checker_binary):
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
wait_for_port(port=7687)
# Register cleanup function
@atexit.register
@ -248,7 +243,7 @@ def execute_test(memgraph_binary, tester_binary, checker_binary):
admin_queries = ["REVOKE ALL PRIVILEGES FROM uSer"]
if len(user_perms) > 0:
admin_queries.append(
"GRANT {} TO User".format(", ".join(user_perms)))
"GRANT {} TO User".format(", ".join(user_perms)))
execute_admin_queries(admin_queries)
authorized, unauthorized = [], []
for query, query_perms in QUERIES:

View File

@ -20,6 +20,8 @@ import sys
import tempfile
import time
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
@ -32,15 +34,8 @@ DUMP_SNAPSHOT_FILE_NAME = "expected_snapshot.cypher"
DUMP_WAL_FILE_NAME = "expected_wal.cypher"
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def sorted_content(file_path):
with open(file_path, 'r') as fin:
with open(file_path, "r") as fin:
return sorted(list(map(lambda x: x.strip(), fin.readlines())))
@ -53,37 +48,41 @@ def list_to_string(data):
def execute_test(
memgraph_binary,
dump_binary,
test_directory,
test_type,
write_expected):
assert test_type in ["SNAPSHOT", "WAL"], \
"Test type should be either 'SNAPSHOT' or 'WAL'."
print("\033[1;36m~~ Executing test {} ({}) ~~\033[0m"
.format(os.path.relpath(test_directory, TESTS_DIR), test_type))
memgraph_binary, dump_binary, test_directory, test_type, write_expected
):
assert test_type in [
"SNAPSHOT",
"WAL",
], "Test type should be either 'SNAPSHOT' or 'WAL'."
print(
"\033[1;36m~~ Executing test {} ({}) ~~\033[0m".format(
os.path.relpath(test_directory, TESTS_DIR), test_type
)
)
working_data_directory = tempfile.TemporaryDirectory()
if test_type == "SNAPSHOT":
snapshots_dir = os.path.join(working_data_directory.name, "snapshots")
os.makedirs(snapshots_dir)
shutil.copy(os.path.join(test_directory, SNAPSHOT_FILE_NAME),
snapshots_dir)
shutil.copy(os.path.join(test_directory, SNAPSHOT_FILE_NAME), snapshots_dir)
else:
wal_dir = os.path.join(working_data_directory.name, "wal")
os.makedirs(wal_dir)
shutil.copy(os.path.join(test_directory, WAL_FILE_NAME), wal_dir)
memgraph_args = [memgraph_binary,
"--storage-recover-on-startup",
"--storage-properties-on-edges",
"--data-directory", working_data_directory.name]
memgraph_args = [
memgraph_binary,
"--storage-recover-on-startup",
"--storage-properties-on-edges",
"--data-directory",
working_data_directory.name,
]
# Start the memgraph binary
memgraph = subprocess.Popen(memgraph_args)
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
wait_for_port(port=7687, delay=0.1)
# Register cleanup function
@atexit.register
@ -101,25 +100,30 @@ def execute_test(
memgraph.terminate()
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
dump_file_name = DUMP_SNAPSHOT_FILE_NAME if test_type == "SNAPSHOT" else DUMP_WAL_FILE_NAME
dump_file_name = (
DUMP_SNAPSHOT_FILE_NAME if test_type == "SNAPSHOT" else DUMP_WAL_FILE_NAME
)
if write_expected:
with open(dump_output_file.name, 'r') as dump:
with open(dump_output_file.name, "r") as dump:
queries_got = dump.readlines()
# Write dump files
expected_dump_file = os.path.join(test_directory, dump_file_name)
with open(expected_dump_file, 'w') as expected:
with open(expected_dump_file, "w") as expected:
expected.writelines(queries_got)
else:
# Compare dump files
expected_dump_file = os.path.join(test_directory, dump_file_name)
assert os.path.exists(expected_dump_file), \
"Could not find expected dump path {}".format(expected_dump_file)
assert os.path.exists(
expected_dump_file
), "Could not find expected dump path {}".format(expected_dump_file)
queries_got = sorted_content(dump_output_file.name)
queries_expected = sorted_content(expected_dump_file)
assert queries_got == queries_expected, "Expected\n{}\nto be equal to\n" \
"{}".format(list_to_string(queries_got),
list_to_string(queries_expected))
assert (
queries_got == queries_expected
), "Expected\n{}\nto be equal to\n" "{}".format(
list_to_string(queries_got), list_to_string(queries_expected)
)
print("\033[1;32m~~ Test successful ~~\033[0m\n")
@ -141,15 +145,19 @@ def find_test_directories(directory):
continue
snapshot_file = os.path.join(test_dir_path, SNAPSHOT_FILE_NAME)
wal_file = os.path.join(test_dir_path, WAL_FILE_NAME)
dump_snapshot_file = os.path.join(
test_dir_path, DUMP_SNAPSHOT_FILE_NAME)
dump_snapshot_file = os.path.join(test_dir_path, DUMP_SNAPSHOT_FILE_NAME)
dump_wal_file = os.path.join(test_dir_path, DUMP_WAL_FILE_NAME)
if (os.path.isfile(snapshot_file) and os.path.isfile(dump_snapshot_file)
and os.path.isfile(wal_file) and os.path.isfile(dump_wal_file)):
if (
os.path.isfile(snapshot_file)
and os.path.isfile(dump_snapshot_file)
and os.path.isfile(wal_file)
and os.path.isfile(dump_wal_file)
):
test_dirs.append(test_dir_path)
else:
raise Exception("Missing data in test directory '{}'"
.format(test_dir_path))
raise Exception(
"Missing data in test directory '{}'".format(test_dir_path)
)
return test_dirs
@ -161,9 +169,10 @@ if __name__ == "__main__":
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--dump", default=dump_binary)
parser.add_argument(
'--write-expected',
action='store_true',
help='Overwrite the expected cypher with results from current run')
"--write-expected",
action="store_true",
help="Overwrite the expected cypher with results from current run",
)
args = parser.parse_args()
test_directories = find_test_directories(TESTS_DIR)
@ -171,16 +180,10 @@ if __name__ == "__main__":
for test_directory in test_directories:
execute_test(
args.memgraph,
args.dump,
test_directory,
"SNAPSHOT",
args.write_expected)
args.memgraph, args.dump, test_directory, "SNAPSHOT", args.write_expected
)
execute_test(
args.memgraph,
args.dump,
test_directory,
"WAL",
args.write_expected)
args.memgraph, args.dump, test_directory, "WAL", args.write_expected
)
sys.exit(0)

View File

@ -20,6 +20,8 @@ import sys
import tempfile
import time
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
@ -45,13 +47,6 @@ roles:
"""
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def execute_tester(binary, queries, username="", password="",
auth_should_fail=False, query_should_fail=False):
if password == "":
@ -121,7 +116,7 @@ class Memgraph:
time.sleep(0.1)
assert self._process.poll() is None, "Memgraph process died " \
"prematurely!"
wait_for_server(7687)
wait_for_port(port=7687)
def stop(self, check=True):
if self._process is None:
@ -394,7 +389,7 @@ if __name__ == "__main__":
slapd = subprocess.Popen(slapd_args)
time.sleep(0.1)
assert slapd.poll() is None, "slapd process died prematurely!"
wait_for_server(1389)
wait_for_port(port=1389)
# Register cleanup function
@atexit.register

View File

@ -20,19 +20,14 @@ import tempfile
import time
import yaml
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def extract_rows(data):
return list(map(lambda x: x.strip(), data.strip().split("\n")))
@ -60,7 +55,7 @@ def verify_lifetime(memgraph_binary, mg_import_csv_binary):
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
wait_for_port(port=7687)
# Register cleanup function
@atexit.register
@ -141,7 +136,7 @@ def execute_test(name, test_path, test_config, memgraph_binary,
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
time.sleep(0.1)
assert memgraph.poll() is None, "Memgraph process died prematurely!"
wait_for_server(7687)
wait_for_port(port=7687)
# Register cleanup function
@atexit.register

View File

@ -11,13 +11,12 @@
import logging
import os
import subprocess
from argparse import ArgumentParser
from collections import defaultdict
import tempfile
import shutil
import time
from common import get_absolute_path, set_cpus
from gqlalchemy import wait_for_port
try:
import jail
@ -25,24 +24,18 @@ except:
import jail_faker as jail
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
class Memgraph:
"""
Knows how to start and stop memgraph.
"""
def __init__(self, args, num_workers):
self.log = logging.getLogger("MemgraphRunner")
argp = ArgumentParser("MemgraphArgumentParser")
argp.add_argument("--runner-bin",
default=get_absolute_path("memgraph", "build"))
argp.add_argument("--port", default="7687",
help="Database and client port")
argp.add_argument(
"--runner-bin", default=get_absolute_path("memgraph", "build")
)
argp.add_argument("--port", default="7687", help="Database and client port")
argp.add_argument("--data-directory", default=None)
argp.add_argument("--storage-snapshot-on-exit", action="store_true")
argp.add_argument("--storage-recover-on-startup", action="store_true")
@ -55,8 +48,12 @@ class Memgraph:
def start(self):
self.log.info("start")
database_args = ["--bolt-port", self.args.port,
"--query-execution-timeout-sec", "0"]
database_args = [
"--bolt-port",
self.args.port,
"--query-execution-timeout-sec",
"0",
]
if self.num_workers:
database_args += ["--bolt-num-workers", str(self.num_workers)]
if self.args.data_directory:
@ -71,7 +68,7 @@ class Memgraph:
# start memgraph
self.database_bin.run(runner_bin, database_args, timeout=600)
wait_for_server(self.args.port)
wait_for_port(port=self.args.port)
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)
@ -82,15 +79,17 @@ class Neo:
"""
Knows how to start and stop neo4j.
"""
def __init__(self, args, config):
self.log = logging.getLogger("NeoRunner")
argp = ArgumentParser("NeoArgumentParser")
argp.add_argument("--runner-bin", default=get_absolute_path(
"neo4j/bin/neo4j", "libs"))
argp.add_argument("--port", default="7687",
help="Database and client port")
argp.add_argument("--http-port", default="7474",
help="Database and client port")
argp.add_argument(
"--runner-bin", default=get_absolute_path("neo4j/bin/neo4j", "libs")
)
argp.add_argument("--port", default="7687", help="Database and client port")
argp.add_argument(
"--http-port", default="7474", help="Database and client port"
)
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.config = config
@ -105,29 +104,36 @@ class Neo:
self.neo4j_home_path = tempfile.mkdtemp(dir="/dev/shm")
try:
os.symlink(os.path.join(get_absolute_path("neo4j", "libs"), "lib"),
os.path.join(self.neo4j_home_path, "lib"))
os.symlink(
os.path.join(get_absolute_path("neo4j", "libs"), "lib"),
os.path.join(self.neo4j_home_path, "lib"),
)
neo4j_conf_dir = os.path.join(self.neo4j_home_path, "conf")
neo4j_conf_file = os.path.join(neo4j_conf_dir, "neo4j.conf")
os.mkdir(neo4j_conf_dir)
shutil.copyfile(self.config, neo4j_conf_file)
with open(neo4j_conf_file, "a") as f:
f.write("\ndbms.connector.bolt.listen_address=:" +
self.args.port + "\n")
f.write("\ndbms.connector.http.listen_address=:" +
self.args.http_port + "\n")
f.write(
"\ndbms.connector.bolt.listen_address=:" + self.args.port + "\n"
)
f.write(
"\ndbms.connector.http.listen_address=:"
+ self.args.http_port
+ "\n"
)
# environment
cwd = os.path.dirname(self.args.runner_bin)
env = {"NEO4J_HOME": self.neo4j_home_path}
self.database_bin.run(self.args.runner_bin, args=["console"],
env=env, timeout=600, cwd=cwd)
self.database_bin.run(
self.args.runner_bin, args=["console"], env=env, timeout=600, cwd=cwd
)
except:
shutil.rmtree(self.neo4j_home_path)
raise Exception("Couldn't run Neo4j!")
wait_for_server(self.args.http_port, 2.0)
wait_for_port(port=self.args.http_port, delay=2.0)
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)

View File

@ -17,13 +17,7 @@ import subprocess
import tempfile
import time
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
from gqlalchemy import wait_for_port
def _convert_args_to_flags(*args, **kwargs):
flags = list(args)
@ -92,7 +86,7 @@ class Memgraph:
if self._proc_mg.poll() is not None:
self._proc_mg = None
raise Exception("The database process died prematurely!")
wait_for_server(7687)
wait_for_port(port=7687)
ret = self._proc_mg.poll()
assert ret is None, "The database process died prematurely " \
"({})!".format(ret)

View File

@ -15,17 +15,12 @@ import subprocess
import tempfile
import time
from gqlalchemy import wait_for_port
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
def wait_for_server(port, delay=1.0):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.5)
time.sleep(delay)
class Memgraph:
def __init__(self, dataset, port, num_workers):
self.proc = None
@ -51,7 +46,7 @@ class Memgraph:
# start memgraph
self.proc = subprocess.Popen(database_args, env=env)
wait_for_server(self.port)
wait_for_port(port=self.port, delay=1.0)
def stop(self):
self.proc.terminate()
@ -99,7 +94,7 @@ class Neo:
shutil.rmtree(self.home_dir)
raise Exception("Couldn't run Neo4j!")
wait_for_server(self.http_port, 2.0)
wait_for_port(port=self.http_port, delay=2.0)
def stop(self):
self.proc.terminate()
@ -112,7 +107,7 @@ class Neo:
def parse_args():
argp = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
argp.add_argument('--scale', type=int, default=1,
help='Dataset scale to use for benchmarking.')
argp.add_argument('--host', default='127.0.0.1', help='Database host.')
@ -196,11 +191,13 @@ def main():
'-p', 'host', args.host, '-p', 'port', args.port,
'-db', 'net.ellitron.ldbcsnbimpls.interactive.neo4j.Neo4jDb',
'-p', 'ldbc.snb.interactive.parameters_dir', parameters_dir,
'--time_compression_ratio', str(args.time_compression_ratio),
'--time_compression_ratio', str(
args.time_compression_ratio),
'--operation_count', str(args.operation_count),
'--thread_count', str(args.thread_count),
'--time_unit', args.time_unit.upper())
subprocess.check_call(java_cmd, cwd=os.path.join(SCRIPT_DIR, 'ldbc_driver'))
subprocess.check_call(
java_cmd, cwd=os.path.join(SCRIPT_DIR, 'ldbc_driver'))
# Copy the results to results dir.
ldbc_results = os.path.join(SCRIPT_DIR, 'ldbc_driver', 'results',

View File

@ -3,13 +3,13 @@
import argparse
import atexit
import json
import multiprocessing
import os
import subprocess
import sys
import time
from gqlalchemy import wait_for_port
# dataset calibrated for running on Apollo (total 4min)
# bipartite.py runs for approx. 30s
# create_match.py runs for approx. 30s
@ -87,13 +87,6 @@ else:
THREADS = multiprocessing.cpu_count()
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
# run test helper function
def run_test(args, test, options, timeout):
print("Running test '{}'".format(test))
@ -102,7 +95,7 @@ def run_test(args, test, options, timeout):
if test.endswith(".py"):
logging = "DEBUG" if args.verbose else "WARNING"
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test),
"--logging", logging]
"--logging", logging]
elif test.endswith(".cpp"):
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
binary = [exe]
@ -112,11 +105,11 @@ def run_test(args, test, options, timeout):
# start test
cmd = binary + ["--worker-count", str(THREADS)] + options
start = time.time()
ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60)
ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60)
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(
test, ret_test.returncode))
test, ret_test.returncode))
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
@ -125,19 +118,19 @@ def run_test(args, test, options, timeout):
# parse arguments
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--log-file", default = "")
parser.add_argument("--data-directory", default = "")
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type = str)
parser.add_argument("--large-dataset", action = "store_const",
const = True, default = False)
parser.add_argument("--use-ssl", action = "store_const",
const = True, default = False)
parser.add_argument("--verbose", action = "store_const",
const = True, default = False)
parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--log-file", default="")
parser.add_argument("--data-directory", default="")
parser.add_argument("--python", default=os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type=str)
parser.add_argument("--large-dataset", action="store_const",
const=True, default=False)
parser.add_argument("--use-ssl", action="store_const",
const=True, default=False)
parser.add_argument("--verbose", action="store_const",
const=True, default=False)
args = parser.parse_args()
# generate temporary SSL certs
@ -145,8 +138,8 @@ if args.use_ssl:
# https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively
subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com"
subprocess.run(["openssl", "req", "-new", "-newkey", "rsa:4096",
"-days", "365", "-nodes", "-x509", "-subj", subj,
"-keyout", KEY_FILE, "-out", CERT_FILE], check=True)
"-days", "365", "-nodes", "-x509", "-subj", subj,
"-keyout", KEY_FILE, "-out", CERT_FILE], check=True)
# start memgraph
cwd = os.path.dirname(args.memgraph)
@ -166,18 +159,22 @@ if args.data_directory:
cmd += ["--data-directory", args.data_directory]
if args.use_ssl:
cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE]
proc_mg = subprocess.Popen(cmd, cwd = cwd)
wait_for_server(7687)
proc_mg = subprocess.Popen(cmd, cwd=cwd)
wait_for_port(port=7687, delay=0.1)
assert proc_mg.poll() is None, "The database binary died prematurely!"
# at exit cleanup
@atexit.register
def cleanup():
global proc_mg
if proc_mg.poll() != None: return
if proc_mg.poll() != None:
return
proc_mg.kill()
proc_mg.wait()
# run tests
runtimes = {}
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET

View File

@ -17,6 +17,7 @@ import time
import threading
from common import connection_argument_parser, SessionCache
from gqlalchemy import wait_for_port
from multiprocessing import Pool, Manager
# Constants and args
@ -83,18 +84,11 @@ def clean_memgraph():
proc_mg.wait()
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def run_memgraph():
global proc_mg
proc_mg = subprocess.Popen(cmd, cwd=cwd)
# Wait for Memgraph to finish the recovery process
wait_for_server(args.endpoint.split(":")[1])
wait_for_port(port=args.endpoint.split(":")[1], delay=0.1)
def run_client(id, data):