memgraph/tests/qa/tck_engine/environment.py
Matej Ferencevic 9a7d25f6a5 Fix distributed startup and QA test
Reviewers: msantl, teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1674
2018-10-18 10:34:05 +02:00

293 lines
9.8 KiB
Python

# -*- coding: utf-8 -*-
import atexit
import datetime
import json
import logging
import os
import subprocess
import sys
import tempfile
import time
from fcntl import fcntl, F_GETFL, F_SETFL
from steps.test_parameters import TestParameters
from neo4j.v1 import GraphDatabase, basic_auth
from steps.graph_properties import GraphProperties
from test_results import TestResults
# Constants - Memgraph flags
COMMON_FLAGS = ["--durability-enabled=false",
"--snapshot-on-exit=false",
"--db-recover-on-startup=false"]
DISTRIBUTED_FLAGS = ["--num-workers", str(6),
"--rpc-num-client-workers", str(6),
"--rpc-num-server-workers", str(6)]
MASTER_PORT = 10000
MASTER_FLAGS = ["--master",
"--master-port", str(MASTER_PORT)]
MEMGRAPH_PORT = 7687
# Module-scoped variables
test_results = TestResults()
temporary_directory = tempfile.TemporaryDirectory()
# Helper functions
def get_script_path():
return os.path.dirname(os.path.realpath(__file__))
def start_process(cmd, stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE, **kwargs):
ret = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, **kwargs)
# set the O_NONBLOCK flag of process stderr file descriptor
if stderr == subprocess.PIPE:
flags = fcntl(ret.stderr, F_GETFL) # get current stderr flags
fcntl(ret.stderr, F_SETFL, flags | os.O_NONBLOCK)
return ret
def is_tested_system_active(context):
return all(proc.poll() is None for proc in context.memgraph_processes)
def is_tested_system_inactive(context):
return not any(proc.poll() is None for proc in context.memgraph_processes)
def get_worker_flags(worker_id):
flags = ["--worker",
"--worker-id", str(worker_id),
"--worker-port", str(10000 + worker_id),
"--master-port", str(10000)]
return flags
def wait_for_server(port, delay=0.01):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
count = 0
while subprocess.call(cmd) != 0:
time.sleep(0.01)
if count > 20 / 0.01:
print("Could not wait for server on port", port, "to startup!")
sys.exit(1)
count += 1
time.sleep(delay)
def run_memgraph(context, flags, distributed):
if distributed:
memgraph_binary = "memgraph_distributed"
else:
memgraph_binary = "memgraph"
memgraph_cmd = [os.path.join(context.memgraph_dir, memgraph_binary)]
memgraph_subprocess = start_process(memgraph_cmd + flags)
context.memgraph_processes.append(memgraph_subprocess)
def start_memgraph(context):
if context.config.distributed: # Run distributed
flags = COMMON_FLAGS.copy()
if context.config.memgraph_params:
flags += context.extra_flags
master_flags = flags.copy()
master_flags.append("--durability-directory=" + os.path.join(
temporary_directory.name, "master"))
run_memgraph(context, master_flags + DISTRIBUTED_FLAGS + MASTER_FLAGS,
context.config.distributed)
wait_for_server(MASTER_PORT, 0.5)
for i in range(1, int(context.config.num_machines)):
worker_flags = flags.copy()
worker_flags.append("--durability-directory=" + os.path.join(
temporary_directory.name, "worker" + str(i)))
run_memgraph(context, worker_flags + DISTRIBUTED_FLAGS +
get_worker_flags(i), context.config.distributed)
wait_for_server(MASTER_PORT + i, 0.5)
else: # Run single machine memgraph
flags = COMMON_FLAGS.copy()
if context.config.memgraph_params:
flags += context.extra_flags
flags.append("--durability-directory=" + temporary_directory.name)
run_memgraph(context, flags, context.config.distributed)
assert is_tested_system_active(context), "Failed to start memgraph"
wait_for_server(MEMGRAPH_PORT, 0.5) # wait for memgraph to start
def cleanup(context):
if context.config.database == "memgraph":
list(map(lambda p: p.kill(), context.memgraph_processes))
list(map(lambda p: p.wait(), context.memgraph_processes))
assert is_tested_system_inactive(context), "Failed to stop memgraph"
context.memgraph_processes.clear()
def get_test_suite(context):
"""
Returns test suite from a test root folder.
If test root is a feature file, name of file is returned without
.feature extension.
"""
root = context.config.root
if root.endswith("/"):
root = root[0:len(root) - 1]
if root.endswith("features"):
root = root[0: len(root) - len("features") - 1]
test_suite = root.split('/')[-1]
return test_suite
def set_logging(context):
"""
Initializes log and sets logging level to debug.
"""
logging.basicConfig(level="DEBUG")
log = logging.getLogger(__name__)
context.log = log
def create_db_driver(context):
"""
Creates database driver and returns it.
"""
uri = context.config.database_uri
auth_token = basic_auth(
context.config.database_username, context.config.database_password)
if context.config.database == "neo4j" or \
context.config.database == "memgraph":
driver = GraphDatabase.driver(uri, auth=auth_token, encrypted=0)
else:
raise "Unsupported database type"
return driver
# Behave specific functions
def before_step(context, step):
"""
Executes before every step. Checks if step is execution
step and sets context variable to true if it is.
"""
context.execution_step = False
if step.name == "executing query":
context.execution_step = True
def before_scenario(context, scenario):
"""
Executes before every scenario. Initializes test parameters,
graph properties, exception and test execution time.
"""
if context.config.database == "memgraph":
# Check if memgraph is up and running
if is_tested_system_active(context):
context.is_tested_system_restarted = False
else:
cleanup(context)
start_memgraph(context)
context.is_tested_system_restarted = True
context.test_parameters = TestParameters()
context.graph_properties = GraphProperties()
context.exception = None
context.execution_time = None
def before_all(context):
"""
Executes before running tests. Initializes driver and latency
dict and creates needed directories.
"""
timestamp = datetime.datetime.fromtimestamp(
time.time()).strftime("%Y_%m_%d__%H_%M_%S")
latency_file = "latency/" + context.config.database + "/" + \
get_test_suite(context) + "/" + timestamp + ".json"
if not os.path.exists(os.path.dirname(latency_file)):
os.makedirs(os.path.dirname(latency_file))
context.latency_file = latency_file
context.js = dict()
context.js["metadata"] = dict()
context.js["metadata"]["execution_time_unit"] = "seconds"
context.js["data"] = dict()
set_logging(context)
# set config for memgraph
context.memgraph_processes = []
script_path = get_script_path()
context.memgraph_dir = os.path.realpath(
os.path.join(script_path, "../../../build"))
if not os.path.exists(context.memgraph_dir):
context.memgraph_dir = os.path.realpath(
os.path.join(script_path, "../../../build_debug"))
if context.config.memgraph_params:
params = context.config.memgraph_params.strip("\"")
context.extra_flags = params.split()
atexit.register(cleanup, context)
if context.config.database == "memgraph":
start_memgraph(context)
context.driver = create_db_driver(context)
def after_scenario(context, scenario):
"""
Executes after every scenario. Pauses execution if flags are set.
Adds execution time to latency dict if it is not None.
"""
err_output = [p.stderr.read() # noqa unused variable
for p in context.memgraph_processes]
# print error output for each subprocess if scenario failed
if scenario.status == "failed":
for i, err in enumerate(err_output):
if err:
err = err.decode("utf-8")
print("\n", "-" * 5, "Machine {}".format(i), "-" * 5)
list(map(print, [line for line in err.splitlines()]))
test_results.add_test(scenario.status, context.is_tested_system_restarted)
if context.config.single_scenario or \
(context.config.single_fail and scenario.status == "failed"):
print("Press enter to continue")
sys.stdin.readline()
if context.execution_time is not None:
context.js['data'][scenario.name] = {
"execution_time": context.execution_time, "status": scenario.status
}
def after_feature(context, feature):
"""
Executes after every feature. If flag is set, pauses before
executing next scenario.
"""
if context.config.single_feature:
print("Press enter to continue")
sys.stdin.readline()
def after_all(context):
"""
Executes when testing is finished. Creates JSON files of test latency
and test results.
"""
context.driver.close()
timestamp = datetime.datetime.fromtimestamp(
time.time()).strftime("%Y_%m_%d__%H_%M")
test_suite = get_test_suite(context)
file_name = context.config.output_folder + timestamp + \
"-" + context.config.database + "-" + context.config.test_name + \
".json"
js = {
"total": test_results.num_total(),
"passed": test_results.num_passed(),
"restarts": test_results.num_restarts(),
"test_suite": test_suite,
"timestamp": timestamp,
"db": context.config.database
}
with open(file_name, 'w') as f:
json.dump(js, f)
with open(context.latency_file, "a") as f:
json.dump(context.js, f)