From ed4c4e6823055b573c08b1c6f7eb7f3cdb16615d Mon Sep 17 00:00:00 2001 From: Boris Tasevski Date: Thu, 28 Apr 2022 16:02:51 +0200 Subject: [PATCH] replaced all occurences of wait_for_server for wait_for_port from gqlalchemy --- tests/e2e/memgraph.py | 21 ++--- tests/gql_behave/continuous_integration | 17 +--- tests/integration/audit/runner.py | 11 +-- tests/integration/auth/runner.py | 13 +-- tests/integration/durability/runner.py | 105 +++++++++++----------- tests/integration/ldap/runner.py | 13 +-- tests/integration/mg_import_csv/runner.py | 13 +-- tests/macro_benchmark/databases.py | 70 ++++++++------- tests/mgbench/runners.py | 10 +-- tests/public_benchmark/ldbc/run_benchmark | 21 ++--- tests/stress/continuous_integration | 57 ++++++------ tests/stress/durability | 10 +-- 12 files changed, 155 insertions(+), 206 deletions(-) diff --git a/tests/e2e/memgraph.py b/tests/e2e/memgraph.py index dffb019a8..3190f93c0 100755 --- a/tests/e2e/memgraph.py +++ b/tests/e2e/memgraph.py @@ -12,30 +12,18 @@ import copy import os import subprocess -import sys import tempfile -import time import mgclient +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) BUILD_DIR = os.path.join(PROJECT_DIR, "build") MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph") -def wait_for_server(port, delay=0.01): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - count = 0 - while subprocess.call(cmd) != 0: - time.sleep(0.01) - if count > 10 / 0.01: - print("Could not wait for server on port", port, "to startup!") - sys.exit(1) - count += 1 - time.sleep(delay) - - def extract_bolt_port(args): for arg_index, arg in enumerate(args): if arg.startswith("--bolt-port="): @@ -88,8 +76,9 @@ class MemgraphInstanceRunner: ] + self.args self.bolt_port = extract_bolt_port(args_mg) self.proc_mg = subprocess.Popen(args_mg) - wait_for_server(self.bolt_port) - self.conn = mgclient.connect(host=self.host, port=self.bolt_port, sslmode=self.ssl) + wait_for_port(port=self.bolt_port) + self.conn = mgclient.connect( + host=self.host, port=self.bolt_port, sslmode=self.ssl) self.conn.autocommit = True assert self.is_running(), "The Memgraph process died!" diff --git a/tests/gql_behave/continuous_integration b/tests/gql_behave/continuous_integration index 35efd90a5..8fb6c2fcb 100755 --- a/tests/gql_behave/continuous_integration +++ b/tests/gql_behave/continuous_integration @@ -20,9 +20,10 @@ import sys import json import subprocess import tempfile -import time import yaml +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) TESTS_DIR = os.path.join(SCRIPT_DIR, "tests") @@ -30,18 +31,6 @@ BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) BUILD_DIR = os.path.join(BASE_DIR, "build") -def wait_for_server(port, delay=0.01): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - count = 0 - while subprocess.call(cmd) != 0: - time.sleep(0.01) - if count > 20 / 0.01: - print("Could not wait for server on port", port, "to startup!") - sys.exit(1) - count += 1 - time.sleep(delay) - - def generate_result_csv(suite, result_path): if not os.path.exists(result_path): return "" @@ -105,7 +94,7 @@ class MemgraphRunner(): args_mg = [memgraph_binary, "--storage-properties-on-edges", "--data-directory", self.data_directory.name] self.proc_mg = subprocess.Popen(args_mg + self.args) - wait_for_server(7687, 1) + wait_for_port(port=7687, delay=1) assert self.is_running(), "The Memgraph process died!" def is_running(self): diff --git a/tests/integration/audit/runner.py b/tests/integration/audit/runner.py index 92e175a1b..c159f1a99 100755 --- a/tests/integration/audit/runner.py +++ b/tests/integration/audit/runner.py @@ -21,6 +21,8 @@ import sys import tempfile import time +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) @@ -58,13 +60,6 @@ QUERIES = [ ] -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - def execute_test(memgraph_binary, tester_binary): storage_directory = tempfile.TemporaryDirectory() memgraph_args = [ @@ -80,7 +75,7 @@ def execute_test(memgraph_binary, tester_binary): memgraph = subprocess.Popen(list(map(str, memgraph_args))) time.sleep(0.1) assert memgraph.poll() is None, "Memgraph process died prematurely!" - wait_for_server(7687) + wait_for_port(port=7687) # Register cleanup function @atexit.register diff --git a/tests/integration/auth/runner.py b/tests/integration/auth/runner.py index 7953bc1ee..d993bb7ca 100755 --- a/tests/integration/auth/runner.py +++ b/tests/integration/auth/runner.py @@ -19,6 +19,8 @@ import sys import tempfile import time +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) @@ -159,13 +161,6 @@ UNAUTHORIZED_ERROR = "You are not authorized to execute this query! Please " \ "contact your database administrator." -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - def execute_tester(binary, queries, should_fail=False, failure_message="", username="", password="", check_failure=True): args = [binary, "--username", username, "--password", password] @@ -217,7 +212,7 @@ def execute_test(memgraph_binary, tester_binary, checker_binary): memgraph = subprocess.Popen(list(map(str, memgraph_args))) time.sleep(0.1) assert memgraph.poll() is None, "Memgraph process died prematurely!" - wait_for_server(7687) + wait_for_port(port=7687) # Register cleanup function @atexit.register @@ -248,7 +243,7 @@ def execute_test(memgraph_binary, tester_binary, checker_binary): admin_queries = ["REVOKE ALL PRIVILEGES FROM uSer"] if len(user_perms) > 0: admin_queries.append( - "GRANT {} TO User".format(", ".join(user_perms))) + "GRANT {} TO User".format(", ".join(user_perms))) execute_admin_queries(admin_queries) authorized, unauthorized = [], [] for query, query_perms in QUERIES: diff --git a/tests/integration/durability/runner.py b/tests/integration/durability/runner.py index dd8c41456..e0a202097 100755 --- a/tests/integration/durability/runner.py +++ b/tests/integration/durability/runner.py @@ -20,6 +20,8 @@ import sys import tempfile import time +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) @@ -32,15 +34,8 @@ DUMP_SNAPSHOT_FILE_NAME = "expected_snapshot.cypher" DUMP_WAL_FILE_NAME = "expected_wal.cypher" -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - def sorted_content(file_path): - with open(file_path, 'r') as fin: + with open(file_path, "r") as fin: return sorted(list(map(lambda x: x.strip(), fin.readlines()))) @@ -53,37 +48,41 @@ def list_to_string(data): def execute_test( - memgraph_binary, - dump_binary, - test_directory, - test_type, - write_expected): - assert test_type in ["SNAPSHOT", "WAL"], \ - "Test type should be either 'SNAPSHOT' or 'WAL'." - print("\033[1;36m~~ Executing test {} ({}) ~~\033[0m" - .format(os.path.relpath(test_directory, TESTS_DIR), test_type)) + memgraph_binary, dump_binary, test_directory, test_type, write_expected +): + assert test_type in [ + "SNAPSHOT", + "WAL", + ], "Test type should be either 'SNAPSHOT' or 'WAL'." + print( + "\033[1;36m~~ Executing test {} ({}) ~~\033[0m".format( + os.path.relpath(test_directory, TESTS_DIR), test_type + ) + ) working_data_directory = tempfile.TemporaryDirectory() if test_type == "SNAPSHOT": snapshots_dir = os.path.join(working_data_directory.name, "snapshots") os.makedirs(snapshots_dir) - shutil.copy(os.path.join(test_directory, SNAPSHOT_FILE_NAME), - snapshots_dir) + shutil.copy(os.path.join(test_directory, SNAPSHOT_FILE_NAME), snapshots_dir) else: wal_dir = os.path.join(working_data_directory.name, "wal") os.makedirs(wal_dir) shutil.copy(os.path.join(test_directory, WAL_FILE_NAME), wal_dir) - memgraph_args = [memgraph_binary, - "--storage-recover-on-startup", - "--storage-properties-on-edges", - "--data-directory", working_data_directory.name] + memgraph_args = [ + memgraph_binary, + "--storage-recover-on-startup", + "--storage-properties-on-edges", + "--data-directory", + working_data_directory.name, + ] # Start the memgraph binary memgraph = subprocess.Popen(memgraph_args) time.sleep(0.1) assert memgraph.poll() is None, "Memgraph process died prematurely!" - wait_for_server(7687) + wait_for_port(port=7687, delay=0.1) # Register cleanup function @atexit.register @@ -101,25 +100,30 @@ def execute_test( memgraph.terminate() assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!" - dump_file_name = DUMP_SNAPSHOT_FILE_NAME if test_type == "SNAPSHOT" else DUMP_WAL_FILE_NAME + dump_file_name = ( + DUMP_SNAPSHOT_FILE_NAME if test_type == "SNAPSHOT" else DUMP_WAL_FILE_NAME + ) if write_expected: - with open(dump_output_file.name, 'r') as dump: + with open(dump_output_file.name, "r") as dump: queries_got = dump.readlines() # Write dump files expected_dump_file = os.path.join(test_directory, dump_file_name) - with open(expected_dump_file, 'w') as expected: + with open(expected_dump_file, "w") as expected: expected.writelines(queries_got) else: # Compare dump files expected_dump_file = os.path.join(test_directory, dump_file_name) - assert os.path.exists(expected_dump_file), \ - "Could not find expected dump path {}".format(expected_dump_file) + assert os.path.exists( + expected_dump_file + ), "Could not find expected dump path {}".format(expected_dump_file) queries_got = sorted_content(dump_output_file.name) queries_expected = sorted_content(expected_dump_file) - assert queries_got == queries_expected, "Expected\n{}\nto be equal to\n" \ - "{}".format(list_to_string(queries_got), - list_to_string(queries_expected)) + assert ( + queries_got == queries_expected + ), "Expected\n{}\nto be equal to\n" "{}".format( + list_to_string(queries_got), list_to_string(queries_expected) + ) print("\033[1;32m~~ Test successful ~~\033[0m\n") @@ -141,15 +145,19 @@ def find_test_directories(directory): continue snapshot_file = os.path.join(test_dir_path, SNAPSHOT_FILE_NAME) wal_file = os.path.join(test_dir_path, WAL_FILE_NAME) - dump_snapshot_file = os.path.join( - test_dir_path, DUMP_SNAPSHOT_FILE_NAME) + dump_snapshot_file = os.path.join(test_dir_path, DUMP_SNAPSHOT_FILE_NAME) dump_wal_file = os.path.join(test_dir_path, DUMP_WAL_FILE_NAME) - if (os.path.isfile(snapshot_file) and os.path.isfile(dump_snapshot_file) - and os.path.isfile(wal_file) and os.path.isfile(dump_wal_file)): + if ( + os.path.isfile(snapshot_file) + and os.path.isfile(dump_snapshot_file) + and os.path.isfile(wal_file) + and os.path.isfile(dump_wal_file) + ): test_dirs.append(test_dir_path) else: - raise Exception("Missing data in test directory '{}'" - .format(test_dir_path)) + raise Exception( + "Missing data in test directory '{}'".format(test_dir_path) + ) return test_dirs @@ -161,9 +169,10 @@ if __name__ == "__main__": parser.add_argument("--memgraph", default=memgraph_binary) parser.add_argument("--dump", default=dump_binary) parser.add_argument( - '--write-expected', - action='store_true', - help='Overwrite the expected cypher with results from current run') + "--write-expected", + action="store_true", + help="Overwrite the expected cypher with results from current run", + ) args = parser.parse_args() test_directories = find_test_directories(TESTS_DIR) @@ -171,16 +180,10 @@ if __name__ == "__main__": for test_directory in test_directories: execute_test( - args.memgraph, - args.dump, - test_directory, - "SNAPSHOT", - args.write_expected) + args.memgraph, args.dump, test_directory, "SNAPSHOT", args.write_expected + ) execute_test( - args.memgraph, - args.dump, - test_directory, - "WAL", - args.write_expected) + args.memgraph, args.dump, test_directory, "WAL", args.write_expected + ) sys.exit(0) diff --git a/tests/integration/ldap/runner.py b/tests/integration/ldap/runner.py index 6b0446690..a4dc97a7e 100755 --- a/tests/integration/ldap/runner.py +++ b/tests/integration/ldap/runner.py @@ -20,6 +20,8 @@ import sys import tempfile import time +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) @@ -45,13 +47,6 @@ roles: """ -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - def execute_tester(binary, queries, username="", password="", auth_should_fail=False, query_should_fail=False): if password == "": @@ -121,7 +116,7 @@ class Memgraph: time.sleep(0.1) assert self._process.poll() is None, "Memgraph process died " \ "prematurely!" - wait_for_server(7687) + wait_for_port(port=7687) def stop(self, check=True): if self._process is None: @@ -394,7 +389,7 @@ if __name__ == "__main__": slapd = subprocess.Popen(slapd_args) time.sleep(0.1) assert slapd.poll() is None, "slapd process died prematurely!" - wait_for_server(1389) + wait_for_port(port=1389) # Register cleanup function @atexit.register diff --git a/tests/integration/mg_import_csv/runner.py b/tests/integration/mg_import_csv/runner.py index afaa58e28..c32aebad9 100755 --- a/tests/integration/mg_import_csv/runner.py +++ b/tests/integration/mg_import_csv/runner.py @@ -20,19 +20,14 @@ import tempfile import time import yaml +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) BUILD_DIR = os.path.join(BASE_DIR, "build") -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - def extract_rows(data): return list(map(lambda x: x.strip(), data.strip().split("\n"))) @@ -60,7 +55,7 @@ def verify_lifetime(memgraph_binary, mg_import_csv_binary): memgraph = subprocess.Popen(list(map(str, memgraph_args))) time.sleep(0.1) assert memgraph.poll() is None, "Memgraph process died prematurely!" - wait_for_server(7687) + wait_for_port(port=7687) # Register cleanup function @atexit.register @@ -141,7 +136,7 @@ def execute_test(name, test_path, test_config, memgraph_binary, memgraph = subprocess.Popen(list(map(str, memgraph_args))) time.sleep(0.1) assert memgraph.poll() is None, "Memgraph process died prematurely!" - wait_for_server(7687) + wait_for_port(port=7687) # Register cleanup function @atexit.register diff --git a/tests/macro_benchmark/databases.py b/tests/macro_benchmark/databases.py index 439464c85..3e0fcedac 100644 --- a/tests/macro_benchmark/databases.py +++ b/tests/macro_benchmark/databases.py @@ -11,13 +11,12 @@ import logging import os -import subprocess from argparse import ArgumentParser -from collections import defaultdict import tempfile import shutil -import time from common import get_absolute_path, set_cpus +from gqlalchemy import wait_for_port + try: import jail @@ -25,24 +24,18 @@ except: import jail_faker as jail -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - class Memgraph: """ Knows how to start and stop memgraph. """ + def __init__(self, args, num_workers): self.log = logging.getLogger("MemgraphRunner") argp = ArgumentParser("MemgraphArgumentParser") - argp.add_argument("--runner-bin", - default=get_absolute_path("memgraph", "build")) - argp.add_argument("--port", default="7687", - help="Database and client port") + argp.add_argument( + "--runner-bin", default=get_absolute_path("memgraph", "build") + ) + argp.add_argument("--port", default="7687", help="Database and client port") argp.add_argument("--data-directory", default=None) argp.add_argument("--storage-snapshot-on-exit", action="store_true") argp.add_argument("--storage-recover-on-startup", action="store_true") @@ -55,8 +48,12 @@ class Memgraph: def start(self): self.log.info("start") - database_args = ["--bolt-port", self.args.port, - "--query-execution-timeout-sec", "0"] + database_args = [ + "--bolt-port", + self.args.port, + "--query-execution-timeout-sec", + "0", + ] if self.num_workers: database_args += ["--bolt-num-workers", str(self.num_workers)] if self.args.data_directory: @@ -71,7 +68,7 @@ class Memgraph: # start memgraph self.database_bin.run(runner_bin, database_args, timeout=600) - wait_for_server(self.args.port) + wait_for_port(port=self.args.port) def stop(self): self.database_bin.send_signal(jail.SIGTERM) @@ -82,15 +79,17 @@ class Neo: """ Knows how to start and stop neo4j. """ + def __init__(self, args, config): self.log = logging.getLogger("NeoRunner") argp = ArgumentParser("NeoArgumentParser") - argp.add_argument("--runner-bin", default=get_absolute_path( - "neo4j/bin/neo4j", "libs")) - argp.add_argument("--port", default="7687", - help="Database and client port") - argp.add_argument("--http-port", default="7474", - help="Database and client port") + argp.add_argument( + "--runner-bin", default=get_absolute_path("neo4j/bin/neo4j", "libs") + ) + argp.add_argument("--port", default="7687", help="Database and client port") + argp.add_argument( + "--http-port", default="7474", help="Database and client port" + ) self.log.info("Initializing Runner with arguments %r", args) self.args, _ = argp.parse_known_args(args) self.config = config @@ -105,29 +104,36 @@ class Neo: self.neo4j_home_path = tempfile.mkdtemp(dir="/dev/shm") try: - os.symlink(os.path.join(get_absolute_path("neo4j", "libs"), "lib"), - os.path.join(self.neo4j_home_path, "lib")) + os.symlink( + os.path.join(get_absolute_path("neo4j", "libs"), "lib"), + os.path.join(self.neo4j_home_path, "lib"), + ) neo4j_conf_dir = os.path.join(self.neo4j_home_path, "conf") neo4j_conf_file = os.path.join(neo4j_conf_dir, "neo4j.conf") os.mkdir(neo4j_conf_dir) shutil.copyfile(self.config, neo4j_conf_file) with open(neo4j_conf_file, "a") as f: - f.write("\ndbms.connector.bolt.listen_address=:" + - self.args.port + "\n") - f.write("\ndbms.connector.http.listen_address=:" + - self.args.http_port + "\n") + f.write( + "\ndbms.connector.bolt.listen_address=:" + self.args.port + "\n" + ) + f.write( + "\ndbms.connector.http.listen_address=:" + + self.args.http_port + + "\n" + ) # environment cwd = os.path.dirname(self.args.runner_bin) env = {"NEO4J_HOME": self.neo4j_home_path} - self.database_bin.run(self.args.runner_bin, args=["console"], - env=env, timeout=600, cwd=cwd) + self.database_bin.run( + self.args.runner_bin, args=["console"], env=env, timeout=600, cwd=cwd + ) except: shutil.rmtree(self.neo4j_home_path) raise Exception("Couldn't run Neo4j!") - wait_for_server(self.args.http_port, 2.0) + wait_for_port(port=self.args.http_port, delay=2.0) def stop(self): self.database_bin.send_signal(jail.SIGTERM) diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py index 891a7cddd..585addb5f 100644 --- a/tests/mgbench/runners.py +++ b/tests/mgbench/runners.py @@ -17,13 +17,7 @@ import subprocess import tempfile import time - -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - +from gqlalchemy import wait_for_port def _convert_args_to_flags(*args, **kwargs): flags = list(args) @@ -92,7 +86,7 @@ class Memgraph: if self._proc_mg.poll() is not None: self._proc_mg = None raise Exception("The database process died prematurely!") - wait_for_server(7687) + wait_for_port(port=7687) ret = self._proc_mg.poll() assert ret is None, "The database process died prematurely " \ "({})!".format(ret) diff --git a/tests/public_benchmark/ldbc/run_benchmark b/tests/public_benchmark/ldbc/run_benchmark index 63048854b..491662a4a 100755 --- a/tests/public_benchmark/ldbc/run_benchmark +++ b/tests/public_benchmark/ldbc/run_benchmark @@ -15,17 +15,12 @@ import subprocess import tempfile import time +from gqlalchemy import wait_for_port + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) -def wait_for_server(port, delay=1.0): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.5) - time.sleep(delay) - - class Memgraph: def __init__(self, dataset, port, num_workers): self.proc = None @@ -51,7 +46,7 @@ class Memgraph: # start memgraph self.proc = subprocess.Popen(database_args, env=env) - wait_for_server(self.port) + wait_for_port(port=self.port, delay=1.0) def stop(self): self.proc.terminate() @@ -99,7 +94,7 @@ class Neo: shutil.rmtree(self.home_dir) raise Exception("Couldn't run Neo4j!") - wait_for_server(self.http_port, 2.0) + wait_for_port(port=self.http_port, delay=2.0) def stop(self): self.proc.terminate() @@ -112,7 +107,7 @@ class Neo: def parse_args(): argp = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) argp.add_argument('--scale', type=int, default=1, help='Dataset scale to use for benchmarking.') argp.add_argument('--host', default='127.0.0.1', help='Database host.') @@ -196,11 +191,13 @@ def main(): '-p', 'host', args.host, '-p', 'port', args.port, '-db', 'net.ellitron.ldbcsnbimpls.interactive.neo4j.Neo4jDb', '-p', 'ldbc.snb.interactive.parameters_dir', parameters_dir, - '--time_compression_ratio', str(args.time_compression_ratio), + '--time_compression_ratio', str( + args.time_compression_ratio), '--operation_count', str(args.operation_count), '--thread_count', str(args.thread_count), '--time_unit', args.time_unit.upper()) - subprocess.check_call(java_cmd, cwd=os.path.join(SCRIPT_DIR, 'ldbc_driver')) + subprocess.check_call( + java_cmd, cwd=os.path.join(SCRIPT_DIR, 'ldbc_driver')) # Copy the results to results dir. ldbc_results = os.path.join(SCRIPT_DIR, 'ldbc_driver', 'results', diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index 697f5158d..8d4470217 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -3,13 +3,13 @@ import argparse import atexit -import json import multiprocessing import os import subprocess -import sys import time +from gqlalchemy import wait_for_port + # dataset calibrated for running on Apollo (total 4min) # bipartite.py runs for approx. 30s # create_match.py runs for approx. 30s @@ -87,13 +87,6 @@ else: THREADS = multiprocessing.cpu_count() -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - # run test helper function def run_test(args, test, options, timeout): print("Running test '{}'".format(test)) @@ -102,7 +95,7 @@ def run_test(args, test, options, timeout): if test.endswith(".py"): logging = "DEBUG" if args.verbose else "WARNING" binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), - "--logging", logging] + "--logging", logging] elif test.endswith(".cpp"): exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4]) binary = [exe] @@ -112,11 +105,11 @@ def run_test(args, test, options, timeout): # start test cmd = binary + ["--worker-count", str(THREADS)] + options start = time.time() - ret_test = subprocess.run(cmd, cwd = SCRIPT_DIR, timeout = timeout * 60) + ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60) if ret_test.returncode != 0: raise Exception("Test '{}' binary returned non-zero ({})!".format( - test, ret_test.returncode)) + test, ret_test.returncode)) runtime = time.time() - start print(" Done after {:.3f} seconds".format(runtime)) @@ -125,19 +118,19 @@ def run_test(args, test, options, timeout): # parse arguments -parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.") -parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR, - "memgraph")) -parser.add_argument("--log-file", default = "") -parser.add_argument("--data-directory", default = "") -parser.add_argument("--python", default = os.path.join(SCRIPT_DIR, - "ve3", "bin", "python3"), type = str) -parser.add_argument("--large-dataset", action = "store_const", - const = True, default = False) -parser.add_argument("--use-ssl", action = "store_const", - const = True, default = False) -parser.add_argument("--verbose", action = "store_const", - const = True, default = False) +parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.") +parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, + "memgraph")) +parser.add_argument("--log-file", default="") +parser.add_argument("--data-directory", default="") +parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, + "ve3", "bin", "python3"), type=str) +parser.add_argument("--large-dataset", action="store_const", + const=True, default=False) +parser.add_argument("--use-ssl", action="store_const", + const=True, default=False) +parser.add_argument("--verbose", action="store_const", + const=True, default=False) args = parser.parse_args() # generate temporary SSL certs @@ -145,8 +138,8 @@ if args.use_ssl: # https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com" subprocess.run(["openssl", "req", "-new", "-newkey", "rsa:4096", - "-days", "365", "-nodes", "-x509", "-subj", subj, - "-keyout", KEY_FILE, "-out", CERT_FILE], check=True) + "-days", "365", "-nodes", "-x509", "-subj", subj, + "-keyout", KEY_FILE, "-out", CERT_FILE], check=True) # start memgraph cwd = os.path.dirname(args.memgraph) @@ -166,18 +159,22 @@ if args.data_directory: cmd += ["--data-directory", args.data_directory] if args.use_ssl: cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE] -proc_mg = subprocess.Popen(cmd, cwd = cwd) -wait_for_server(7687) +proc_mg = subprocess.Popen(cmd, cwd=cwd) +wait_for_port(port=7687, delay=0.1) assert proc_mg.poll() is None, "The database binary died prematurely!" # at exit cleanup + + @atexit.register def cleanup(): global proc_mg - if proc_mg.poll() != None: return + if proc_mg.poll() != None: + return proc_mg.kill() proc_mg.wait() + # run tests runtimes = {} dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET diff --git a/tests/stress/durability b/tests/stress/durability index 4e09281d2..4cf2150e4 100755 --- a/tests/stress/durability +++ b/tests/stress/durability @@ -17,6 +17,7 @@ import time import threading from common import connection_argument_parser, SessionCache +from gqlalchemy import wait_for_port from multiprocessing import Pool, Manager # Constants and args @@ -83,18 +84,11 @@ def clean_memgraph(): proc_mg.wait() -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - def run_memgraph(): global proc_mg proc_mg = subprocess.Popen(cmd, cwd=cwd) # Wait for Memgraph to finish the recovery process - wait_for_server(args.endpoint.split(":")[1]) + wait_for_port(port=args.endpoint.split(":")[1], delay=0.1) def run_client(id, data):