75950664a7
Summary: This diff splits single node and distributed storage from each other. Currently all of the storage code is copied into two directories (one single node, one distributed). The logic used in the storage implementation isn't touched, it will be refactored in following diffs. To clean the working directory after this diff you should execute: ``` rm database/state_delta.capnp rm database/state_delta.hpp rm storage/concurrent_id_mapper_rpc_messages.capnp rm storage/concurrent_id_mapper_rpc_messages.hpp ``` Reviewers: teon.banek, buda, msantl Reviewed By: teon.banek, msantl Subscribers: teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1625
290 lines
9.6 KiB
Python
290 lines
9.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import atexit
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from fcntl import fcntl, F_GETFL, F_SETFL
|
|
from steps.test_parameters import TestParameters
|
|
from neo4j.v1 import GraphDatabase, basic_auth
|
|
from steps.graph_properties import GraphProperties
|
|
from test_results import TestResults
|
|
|
|
# Constants - Memgraph flags
|
|
COMMON_FLAGS = ["--durability-enabled=false",
|
|
"--snapshot-on-exit=false",
|
|
"--db-recover-on-startup=false"]
|
|
DISTRIBUTED_FLAGS = ["--num-workers", str(6),
|
|
"--rpc-num-client-workers", str(6),
|
|
"--rpc-num-server-workers", str(6)]
|
|
MASTER_FLAGS = ["--master",
|
|
"--master-port", "10000"]
|
|
MEMGRAPH_PORT = 7687
|
|
|
|
# Module-scoped variables
|
|
test_results = TestResults()
|
|
temporary_directory = tempfile.TemporaryDirectory()
|
|
|
|
|
|
# Helper functions
|
|
def get_script_path():
|
|
return os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
def start_process(cmd, stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE, **kwargs):
|
|
ret = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, **kwargs)
|
|
# set the O_NONBLOCK flag of process stderr file descriptor
|
|
if stderr == subprocess.PIPE:
|
|
flags = fcntl(ret.stderr, F_GETFL) # get current stderr flags
|
|
fcntl(ret.stderr, F_SETFL, flags | os.O_NONBLOCK)
|
|
return ret
|
|
|
|
|
|
def is_tested_system_active(context):
|
|
return all(proc.poll() is None for proc in context.memgraph_processes)
|
|
|
|
|
|
def is_tested_system_inactive(context):
|
|
return not any(proc.poll() is None for proc in context.memgraph_processes)
|
|
|
|
|
|
def get_worker_flags(worker_id):
|
|
flags = ["--worker",
|
|
"--worker-id", str(worker_id),
|
|
"--worker-port", str(10000 + worker_id),
|
|
"--master-port", str(10000)]
|
|
return flags
|
|
|
|
|
|
def wait_for_server(port, delay=0.01):
|
|
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
|
|
count = 0
|
|
while subprocess.call(cmd) != 0:
|
|
time.sleep(0.01)
|
|
if count > 20 / 0.01:
|
|
print("Could not wait for server on port", port, "to startup!")
|
|
sys.exit(1)
|
|
count += 1
|
|
time.sleep(delay)
|
|
|
|
|
|
def run_memgraph(context, flags, distributed):
|
|
if distributed:
|
|
memgraph_binary = "memgraph_distributed"
|
|
else:
|
|
memgraph_binary = "memgraph"
|
|
memgraph_cmd = [os.path.join(context.memgraph_dir, memgraph_binary)]
|
|
memgraph_subprocess = start_process(memgraph_cmd + flags)
|
|
context.memgraph_processes.append(memgraph_subprocess)
|
|
|
|
|
|
def start_memgraph(context):
|
|
if context.config.distributed: # Run distributed
|
|
flags = COMMON_FLAGS.copy()
|
|
if context.config.memgraph_params:
|
|
flags += context.extra_flags
|
|
master_flags = flags.copy()
|
|
master_flags.append("--durability-directory=" + os.path.join(
|
|
temporary_directory.name, "master"))
|
|
run_memgraph(context, master_flags + DISTRIBUTED_FLAGS + MASTER_FLAGS,
|
|
context.config.distributed)
|
|
for i in range(1, int(context.config.num_machines)):
|
|
worker_flags = flags.copy()
|
|
worker_flags.append("--durability-directory=" + os.path.join(
|
|
temporary_directory.name, "worker" + str(i)))
|
|
run_memgraph(context, worker_flags + DISTRIBUTED_FLAGS +
|
|
get_worker_flags(i), context.config.distributed)
|
|
else: # Run single machine memgraph
|
|
flags = COMMON_FLAGS.copy()
|
|
if context.config.memgraph_params:
|
|
flags += context.extra_flags
|
|
flags.append("--durability-directory=" + temporary_directory.name)
|
|
run_memgraph(context, flags, context.config.distributed)
|
|
assert is_tested_system_active(context), "Failed to start memgraph"
|
|
wait_for_server(str(MEMGRAPH_PORT)) # wait for memgraph to start
|
|
|
|
|
|
def cleanup(context):
|
|
if context.config.database == "memgraph":
|
|
list(map(lambda p: p.kill(), context.memgraph_processes))
|
|
list(map(lambda p: p.wait(), context.memgraph_processes))
|
|
assert is_tested_system_inactive(context), "Failed to stop memgraph"
|
|
context.memgraph_processes.clear()
|
|
|
|
|
|
def get_test_suite(context):
|
|
"""
|
|
Returns test suite from a test root folder.
|
|
If test root is a feature file, name of file is returned without
|
|
.feature extension.
|
|
"""
|
|
root = context.config.root
|
|
|
|
if root.endswith("/"):
|
|
root = root[0:len(root) - 1]
|
|
if root.endswith("features"):
|
|
root = root[0: len(root) - len("features") - 1]
|
|
|
|
test_suite = root.split('/')[-1]
|
|
|
|
return test_suite
|
|
|
|
|
|
def set_logging(context):
|
|
"""
|
|
Initializes log and sets logging level to debug.
|
|
"""
|
|
logging.basicConfig(level="DEBUG")
|
|
log = logging.getLogger(__name__)
|
|
context.log = log
|
|
|
|
|
|
def create_db_driver(context):
|
|
"""
|
|
Creates database driver and returns it.
|
|
"""
|
|
uri = context.config.database_uri
|
|
auth_token = basic_auth(
|
|
context.config.database_username, context.config.database_password)
|
|
if context.config.database == "neo4j" or \
|
|
context.config.database == "memgraph":
|
|
driver = GraphDatabase.driver(uri, auth=auth_token, encrypted=0)
|
|
else:
|
|
raise "Unsupported database type"
|
|
return driver
|
|
|
|
|
|
# Behave specific functions
|
|
def before_step(context, step):
|
|
"""
|
|
Executes before every step. Checks if step is execution
|
|
step and sets context variable to true if it is.
|
|
"""
|
|
context.execution_step = False
|
|
if step.name == "executing query":
|
|
context.execution_step = True
|
|
|
|
|
|
def before_scenario(context, scenario):
|
|
"""
|
|
Executes before every scenario. Initializes test parameters,
|
|
graph properties, exception and test execution time.
|
|
"""
|
|
if context.config.database == "memgraph":
|
|
# Check if memgraph is up and running
|
|
if is_tested_system_active(context):
|
|
context.is_tested_system_restarted = False
|
|
else:
|
|
cleanup(context)
|
|
start_memgraph(context)
|
|
context.is_tested_system_restarted = True
|
|
context.test_parameters = TestParameters()
|
|
context.graph_properties = GraphProperties()
|
|
context.exception = None
|
|
context.execution_time = None
|
|
|
|
|
|
def before_all(context):
|
|
"""
|
|
Executes before running tests. Initializes driver and latency
|
|
dict and creates needed directories.
|
|
"""
|
|
timestamp = datetime.datetime.fromtimestamp(
|
|
time.time()).strftime("%Y_%m_%d__%H_%M_%S")
|
|
latency_file = "latency/" + context.config.database + "/" + \
|
|
get_test_suite(context) + "/" + timestamp + ".json"
|
|
if not os.path.exists(os.path.dirname(latency_file)):
|
|
os.makedirs(os.path.dirname(latency_file))
|
|
context.latency_file = latency_file
|
|
context.js = dict()
|
|
context.js["metadata"] = dict()
|
|
context.js["metadata"]["execution_time_unit"] = "seconds"
|
|
context.js["data"] = dict()
|
|
set_logging(context)
|
|
# set config for memgraph
|
|
context.memgraph_processes = []
|
|
script_path = get_script_path()
|
|
context.memgraph_dir = os.path.realpath(
|
|
os.path.join(script_path, "../../../build"))
|
|
if not os.path.exists(context.memgraph_dir):
|
|
context.memgraph_dir = os.path.realpath(
|
|
os.path.join(script_path, "../../../build_debug"))
|
|
if context.config.memgraph_params:
|
|
params = context.config.memgraph_params.strip("\"")
|
|
context.extra_flags = params.split()
|
|
atexit.register(cleanup, context)
|
|
if context.config.database == "memgraph":
|
|
start_memgraph(context)
|
|
context.driver = create_db_driver(context)
|
|
|
|
|
|
def after_scenario(context, scenario):
|
|
"""
|
|
Executes after every scenario. Pauses execution if flags are set.
|
|
Adds execution time to latency dict if it is not None.
|
|
"""
|
|
err_output = [p.stderr.read() # noqa unused variable
|
|
for p in context.memgraph_processes]
|
|
# print error output for each subprocess if scenario failed
|
|
if scenario.status == "failed":
|
|
for i, err in enumerate(err_output):
|
|
if err:
|
|
err = err.decode("utf-8")
|
|
print("\n", "-" * 5, "Machine {}".format(i), "-" * 5)
|
|
list(map(print, [line for line in err.splitlines()]))
|
|
test_results.add_test(scenario.status, context.is_tested_system_restarted)
|
|
if context.config.single_scenario or \
|
|
(context.config.single_fail and scenario.status == "failed"):
|
|
print("Press enter to continue")
|
|
sys.stdin.readline()
|
|
|
|
if context.execution_time is not None:
|
|
context.js['data'][scenario.name] = {
|
|
"execution_time": context.execution_time, "status": scenario.status
|
|
}
|
|
|
|
|
|
def after_feature(context, feature):
|
|
"""
|
|
Executes after every feature. If flag is set, pauses before
|
|
executing next scenario.
|
|
"""
|
|
if context.config.single_feature:
|
|
print("Press enter to continue")
|
|
sys.stdin.readline()
|
|
|
|
|
|
def after_all(context):
|
|
"""
|
|
Executes when testing is finished. Creates JSON files of test latency
|
|
and test results.
|
|
"""
|
|
context.driver.close()
|
|
timestamp = datetime.datetime.fromtimestamp(
|
|
time.time()).strftime("%Y_%m_%d__%H_%M")
|
|
|
|
test_suite = get_test_suite(context)
|
|
file_name = context.config.output_folder + timestamp + \
|
|
"-" + context.config.database + "-" + context.config.test_name + \
|
|
".json"
|
|
|
|
js = {
|
|
"total": test_results.num_total(),
|
|
"passed": test_results.num_passed(),
|
|
"restarts": test_results.num_restarts(),
|
|
"test_suite": test_suite,
|
|
"timestamp": timestamp,
|
|
"db": context.config.database
|
|
}
|
|
with open(file_name, 'w') as f:
|
|
json.dump(js, f)
|
|
|
|
with open(context.latency_file, "a") as f:
|
|
json.dump(context.js, f)
|