diff --git a/config/durability_stress.conf b/config/durability_stress.conf new file mode 100644 index 000000000..09141eb91 --- /dev/null +++ b/config/durability_stress.conf @@ -0,0 +1,15 @@ +# MEMGRAPH DEFAULT DURABILITY STRESS TESTING CONFIG + +# NOTE: all paths are relative to the run folder +# (where the executable is run) + +# enable durability +--durability-enabled=true +--snapshot-cycle-sec=5 +--snapshot-on-exit=false +--snapshot-max-retained=2 + +--db-recover-on-startup=true + +# increase query timeout (10 min) +--query-execution-time-sec=600 diff --git a/tests/stress/apollo_runs.yaml b/tests/stress/apollo_runs.yaml index 44de4de42..33b242245 100644 --- a/tests/stress/apollo_runs.yaml +++ b/tests/stress/apollo_runs.yaml @@ -11,3 +11,16 @@ commands: TIMEOUT=43200 ./continuous_integration --large-dataset infiles: *STRESS_INFILES slave_group: remote_16c128g + +- name: durability + commands: TIMEOUT=300 ./ve3/bin/python3 durability --num-steps 5 + infiles: &DURABILITY_INFILES + - . # current directory + - ../../build_release/memgraph # memgraph release binary + - ../../config # directory with config files + +- name: durability_large + project: release + commands: TIMEOUT=3600 ./ve3/bin/python3 durability --num-steps 20 + infiles: *DURABILITY_INFILES + slave_group: remote_16c128g diff --git a/tests/stress/bipartite.py b/tests/stress/bipartite.py index 7e6562fe1..d93c6f5ae 100644 --- a/tests/stress/bipartite.py +++ b/tests/stress/bipartite.py @@ -9,10 +9,10 @@ import logging import multiprocessing import time import atexit -import os -from common import connection_argument_parser, assert_equal, argument_driver, \ - OutputData, execute_till_success, batch, render +from common import connection_argument_parser, assert_equal, \ + OutputData, execute_till_success, \ + batch, render, SessionCache def parse_args(): @@ -47,28 +47,6 @@ args = parse_args() output_data = OutputData() -# This class is used to create and cache sessions. Session is cached by args -# used to create it and process' pid in which it was created. This makes it easy -# to reuse session with python multiprocessing primitives like pmap. -class SessionCache: - cache = {} - - @staticmethod - def argument_session(args): - key = tuple(vars(args).items()) + (os.getpid(),) - if key in SessionCache.cache: - return SessionCache.cache[key][1] - driver = argument_driver(args) - session = driver.session() - SessionCache.cache[key] = (driver, session) - return session - - @staticmethod - def cleanup(): - for _, (driver, session) in SessionCache.cache.items(): - session.close() - driver.close() - atexit.register(SessionCache.cleanup) diff --git a/tests/stress/common.py b/tests/stress/common.py index 7cd9fffa3..32f4da136 100644 --- a/tests/stress/common.py +++ b/tests/stress/common.py @@ -8,6 +8,7 @@ Only Bolt communication protocol is supported. ''' import contextlib +import os from threading import Thread from time import sleep @@ -191,6 +192,28 @@ def argument_driver(args, ssl=False): auth=(args.username, str(args.password)), encrypted=ssl) +# This class is used to create and cache sessions. Session is cached by args +# used to create it and process' pid in which it was created. This makes it easy +# to reuse session with python multiprocessing primitives like pmap. +class SessionCache: + cache = {} + + @staticmethod + def argument_session(args): + key = tuple(vars(args).items()) + (os.getpid(),) + if key in SessionCache.cache: + return SessionCache.cache[key][1] + driver = argument_driver(args) # | + session = driver.session() # V + SessionCache.cache[key] = (driver, session) + return session + + @staticmethod + def cleanup(): + for _, (driver, session) in SessionCache.cache.items(): + session.close() + driver.close() + def periodically_execute(callable, args, interval, daemon=True): """ diff --git a/tests/stress/durability b/tests/stress/durability new file mode 100755 index 000000000..ac2478431 --- /dev/null +++ b/tests/stress/durability @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +Durability Stress Test +''' + +# TODO (mg_durability_stress_test): extend once we add full durability mode + +import atexit +import multiprocessing +import os +import random +import shutil +import subprocess +import time +import threading + +from common import connection_argument_parser, SessionCache +from multiprocessing import Pool, Manager + +# Constants and args +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) +BUILD_DIR = os.path.join(BASE_DIR, "build") +CONFIG_DIR = os.path.join(BASE_DIR, "config") +DURABILITY_DIR = os.path.join(BUILD_DIR, "durability") +if "THREADS" in os.environ: + DB_WORKERS = os.environ["THREADS"] +else: + DB_WORKERS = multiprocessing.cpu_count() +# The snapshot interval has to be in sync with Memgraph's parameter +# --snapshot-cycle-sec +SNAPSHOT_CYCLE_SEC = 5 + +# Parse command line arguments +parser = connection_argument_parser() + +parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, + "memgraph")) +parser.add_argument("--config", default=os.path.join(CONFIG_DIR, + "durability_stress.conf")) +parser.add_argument("--log-file", default="") +parser.add_argument("--verbose", action="store_const", + const=True, default=False) +parser.add_argument("--durability-directory", default=DURABILITY_DIR) +parser.add_argument("--num-clients", default=multiprocessing.cpu_count()) +parser.add_argument("--num-steps", type=int, default=5) +args = parser.parse_args() + +# Find Memgraph directory (alternative location) +if not os.path.exists(args.memgraph): + args.memgraph = os.path.join(BASE_DIR, "build_release", "memgraph") + +# Memgraph run command construction +cwd = os.path.dirname(args.memgraph) +cmd = [args.memgraph, "--num-workers=" + str(DB_WORKERS)] +if not args.verbose: + cmd += ["--min-log-level", "1"] +if args.log_file: + cmd += ["--log-file", args.log_file] +if args.durability_directory: + cmd += ["--durability-directory", args.durability_directory] + +data_manager = Manager() +data = data_manager.dict() + +# Pretest cleanup +if os.path.exists(DURABILITY_DIR): + shutil.rmtree(DURABILITY_DIR) + +atexit.register(SessionCache.cleanup) + + +@atexit.register +def clean_memgraph(): + global proc_mg + if proc_mg is None: + return + if proc_mg.poll() is not None: + return + proc_mg.kill() + proc_mg.wait() + + +def wait_for_server(port, delay=0.1): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port] + while subprocess.call(cmd) != 0: + time.sleep(0.01) + time.sleep(delay) + + +def run_memgraph(): + global proc_mg + proc_mg = subprocess.Popen(cmd, cwd=cwd, + env={"MEMGRAPH_CONFIG": args.config}) + # Wait for Memgraph to finish the recovery process + wait_for_server(args.endpoint.split(":")[1]) + + +def run_client(id, data): + # init + session = SessionCache.argument_session(args) + data.setdefault(id, 0) + counter = data[id] + + # Check recovery for this client + num_nodes_db = session.run( + "MATCH (n:%s) RETURN count(n) as cnt" % + ("Client%d" % id)).data()[0]["cnt"] + print("Client%d DB: %d; ACTUAL: %d" % (id, num_nodes_db, data[id])) + + # Execute a new set of write queries + while True: + try: + session.run("CREATE (n:%s {id:%s}) RETURN n;" % + ("Client%d" % id, counter)).consume() + counter += 1 + if counter % 100000 == 0: + print("Client %d executed %d" % (id, counter)) + except Exception: + print("DB isn't reachable any more") + break + data[id] = counter + print("Client %d executed %d" % (id, counter)) + + +def run_step(): + with Pool(args.num_clients) as p: + p.starmap(run_client, [(id, data) + for id in range(1, args.num_clients + 1)]) + + +def main(): + for step in range(1, args.num_steps + 1): + print("#### Step %d" % step) + run_memgraph() + thread = threading.Thread(target=run_step) + thread.daemon = True + thread.start() + # Kill Memgraph at arbitrary point in time. It makse sense to ensure + # that at least one snapshot has to be generated beucase we want to + # test the entire recovery process (Snapshot + WAL). + # Also it makes sense to run this test for a longer period of time + # but with small --snapshot-cycle-sec to force that Memgraph is being + # killed in the middle of generating the snapshot. + time.sleep( + random.randint(2 * SNAPSHOT_CYCLE_SEC, 3 * SNAPSHOT_CYCLE_SEC)) + clean_memgraph() + + # Final check + run_memgraph() + session = SessionCache.argument_session(args) + num_nodes_db = \ + session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"] + num_nodes = sum(data.values()) + # Check that more than 99.9% of data is recoverable. + # NOTE: default WAL flush interval is 1ms. + # Once the full sync durability mode is introduced, + # this test has to be extended. + assert num_nodes_db > 0.999 * num_nodes, \ + "Memgraph lost more than 0.001 of data!" + + +if __name__ == "__main__": + main()