Add durability stress test
Reviewers: dgleich, mferencevic Reviewed By: dgleich, mferencevic Subscribers: mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D1371
This commit is contained in:
parent
261d50a02e
commit
44474b55e3
15
config/durability_stress.conf
Normal file
15
config/durability_stress.conf
Normal file
@ -0,0 +1,15 @@
|
||||
# MEMGRAPH DEFAULT DURABILITY STRESS TESTING CONFIG
|
||||
|
||||
# NOTE: all paths are relative to the run folder
|
||||
# (where the executable is run)
|
||||
|
||||
# enable durability
|
||||
--durability-enabled=true
|
||||
--snapshot-cycle-sec=5
|
||||
--snapshot-on-exit=false
|
||||
--snapshot-max-retained=2
|
||||
|
||||
--db-recover-on-startup=true
|
||||
|
||||
# increase query timeout (10 min)
|
||||
--query-execution-time-sec=600
|
@ -11,3 +11,16 @@
|
||||
commands: TIMEOUT=43200 ./continuous_integration --large-dataset
|
||||
infiles: *STRESS_INFILES
|
||||
slave_group: remote_16c128g
|
||||
|
||||
- name: durability
|
||||
commands: TIMEOUT=300 ./ve3/bin/python3 durability --num-steps 5
|
||||
infiles: &DURABILITY_INFILES
|
||||
- . # current directory
|
||||
- ../../build_release/memgraph # memgraph release binary
|
||||
- ../../config # directory with config files
|
||||
|
||||
- name: durability_large
|
||||
project: release
|
||||
commands: TIMEOUT=3600 ./ve3/bin/python3 durability --num-steps 20
|
||||
infiles: *DURABILITY_INFILES
|
||||
slave_group: remote_16c128g
|
||||
|
@ -9,10 +9,10 @@ import logging
|
||||
import multiprocessing
|
||||
import time
|
||||
import atexit
|
||||
import os
|
||||
|
||||
from common import connection_argument_parser, assert_equal, argument_driver, \
|
||||
OutputData, execute_till_success, batch, render
|
||||
from common import connection_argument_parser, assert_equal, \
|
||||
OutputData, execute_till_success, \
|
||||
batch, render, SessionCache
|
||||
|
||||
|
||||
def parse_args():
|
||||
@ -47,28 +47,6 @@ args = parse_args()
|
||||
output_data = OutputData()
|
||||
|
||||
|
||||
# This class is used to create and cache sessions. Session is cached by args
|
||||
# used to create it and process' pid in which it was created. This makes it easy
|
||||
# to reuse session with python multiprocessing primitives like pmap.
|
||||
class SessionCache:
|
||||
cache = {}
|
||||
|
||||
@staticmethod
|
||||
def argument_session(args):
|
||||
key = tuple(vars(args).items()) + (os.getpid(),)
|
||||
if key in SessionCache.cache:
|
||||
return SessionCache.cache[key][1]
|
||||
driver = argument_driver(args)
|
||||
session = driver.session()
|
||||
SessionCache.cache[key] = (driver, session)
|
||||
return session
|
||||
|
||||
@staticmethod
|
||||
def cleanup():
|
||||
for _, (driver, session) in SessionCache.cache.items():
|
||||
session.close()
|
||||
driver.close()
|
||||
|
||||
atexit.register(SessionCache.cleanup)
|
||||
|
||||
|
||||
|
@ -8,6 +8,7 @@ Only Bolt communication protocol is supported.
|
||||
'''
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
@ -191,6 +192,28 @@ def argument_driver(args, ssl=False):
|
||||
auth=(args.username, str(args.password)),
|
||||
encrypted=ssl)
|
||||
|
||||
# This class is used to create and cache sessions. Session is cached by args
|
||||
# used to create it and process' pid in which it was created. This makes it easy
|
||||
# to reuse session with python multiprocessing primitives like pmap.
|
||||
class SessionCache:
|
||||
cache = {}
|
||||
|
||||
@staticmethod
|
||||
def argument_session(args):
|
||||
key = tuple(vars(args).items()) + (os.getpid(),)
|
||||
if key in SessionCache.cache:
|
||||
return SessionCache.cache[key][1]
|
||||
driver = argument_driver(args) # |
|
||||
session = driver.session() # V
|
||||
SessionCache.cache[key] = (driver, session)
|
||||
return session
|
||||
|
||||
@staticmethod
|
||||
def cleanup():
|
||||
for _, (driver, session) in SessionCache.cache.items():
|
||||
session.close()
|
||||
driver.close()
|
||||
|
||||
|
||||
def periodically_execute(callable, args, interval, daemon=True):
|
||||
"""
|
||||
|
166
tests/stress/durability
Executable file
166
tests/stress/durability
Executable file
@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
'''
|
||||
Durability Stress Test
|
||||
'''
|
||||
|
||||
# TODO (mg_durability_stress_test): extend once we add full durability mode
|
||||
|
||||
import atexit
|
||||
import multiprocessing
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import threading
|
||||
|
||||
from common import connection_argument_parser, SessionCache
|
||||
from multiprocessing import Pool, Manager
|
||||
|
||||
# Constants and args
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
|
||||
BUILD_DIR = os.path.join(BASE_DIR, "build")
|
||||
CONFIG_DIR = os.path.join(BASE_DIR, "config")
|
||||
DURABILITY_DIR = os.path.join(BUILD_DIR, "durability")
|
||||
if "THREADS" in os.environ:
|
||||
DB_WORKERS = os.environ["THREADS"]
|
||||
else:
|
||||
DB_WORKERS = multiprocessing.cpu_count()
|
||||
# The snapshot interval has to be in sync with Memgraph's parameter
|
||||
# --snapshot-cycle-sec
|
||||
SNAPSHOT_CYCLE_SEC = 5
|
||||
|
||||
# Parse command line arguments
|
||||
parser = connection_argument_parser()
|
||||
|
||||
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR,
|
||||
"memgraph"))
|
||||
parser.add_argument("--config", default=os.path.join(CONFIG_DIR,
|
||||
"durability_stress.conf"))
|
||||
parser.add_argument("--log-file", default="")
|
||||
parser.add_argument("--verbose", action="store_const",
|
||||
const=True, default=False)
|
||||
parser.add_argument("--durability-directory", default=DURABILITY_DIR)
|
||||
parser.add_argument("--num-clients", default=multiprocessing.cpu_count())
|
||||
parser.add_argument("--num-steps", type=int, default=5)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Find Memgraph directory (alternative location)
|
||||
if not os.path.exists(args.memgraph):
|
||||
args.memgraph = os.path.join(BASE_DIR, "build_release", "memgraph")
|
||||
|
||||
# Memgraph run command construction
|
||||
cwd = os.path.dirname(args.memgraph)
|
||||
cmd = [args.memgraph, "--num-workers=" + str(DB_WORKERS)]
|
||||
if not args.verbose:
|
||||
cmd += ["--min-log-level", "1"]
|
||||
if args.log_file:
|
||||
cmd += ["--log-file", args.log_file]
|
||||
if args.durability_directory:
|
||||
cmd += ["--durability-directory", args.durability_directory]
|
||||
|
||||
data_manager = Manager()
|
||||
data = data_manager.dict()
|
||||
|
||||
# Pretest cleanup
|
||||
if os.path.exists(DURABILITY_DIR):
|
||||
shutil.rmtree(DURABILITY_DIR)
|
||||
|
||||
atexit.register(SessionCache.cleanup)
|
||||
|
||||
|
||||
@atexit.register
|
||||
def clean_memgraph():
|
||||
global proc_mg
|
||||
if proc_mg is None:
|
||||
return
|
||||
if proc_mg.poll() is not None:
|
||||
return
|
||||
proc_mg.kill()
|
||||
proc_mg.wait()
|
||||
|
||||
|
||||
def wait_for_server(port, delay=0.1):
|
||||
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
|
||||
while subprocess.call(cmd) != 0:
|
||||
time.sleep(0.01)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def run_memgraph():
|
||||
global proc_mg
|
||||
proc_mg = subprocess.Popen(cmd, cwd=cwd,
|
||||
env={"MEMGRAPH_CONFIG": args.config})
|
||||
# Wait for Memgraph to finish the recovery process
|
||||
wait_for_server(args.endpoint.split(":")[1])
|
||||
|
||||
|
||||
def run_client(id, data):
|
||||
# init
|
||||
session = SessionCache.argument_session(args)
|
||||
data.setdefault(id, 0)
|
||||
counter = data[id]
|
||||
|
||||
# Check recovery for this client
|
||||
num_nodes_db = session.run(
|
||||
"MATCH (n:%s) RETURN count(n) as cnt" %
|
||||
("Client%d" % id)).data()[0]["cnt"]
|
||||
print("Client%d DB: %d; ACTUAL: %d" % (id, num_nodes_db, data[id]))
|
||||
|
||||
# Execute a new set of write queries
|
||||
while True:
|
||||
try:
|
||||
session.run("CREATE (n:%s {id:%s}) RETURN n;" %
|
||||
("Client%d" % id, counter)).consume()
|
||||
counter += 1
|
||||
if counter % 100000 == 0:
|
||||
print("Client %d executed %d" % (id, counter))
|
||||
except Exception:
|
||||
print("DB isn't reachable any more")
|
||||
break
|
||||
data[id] = counter
|
||||
print("Client %d executed %d" % (id, counter))
|
||||
|
||||
|
||||
def run_step():
|
||||
with Pool(args.num_clients) as p:
|
||||
p.starmap(run_client, [(id, data)
|
||||
for id in range(1, args.num_clients + 1)])
|
||||
|
||||
|
||||
def main():
|
||||
for step in range(1, args.num_steps + 1):
|
||||
print("#### Step %d" % step)
|
||||
run_memgraph()
|
||||
thread = threading.Thread(target=run_step)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
# Kill Memgraph at arbitrary point in time. It makse sense to ensure
|
||||
# that at least one snapshot has to be generated beucase we want to
|
||||
# test the entire recovery process (Snapshot + WAL).
|
||||
# Also it makes sense to run this test for a longer period of time
|
||||
# but with small --snapshot-cycle-sec to force that Memgraph is being
|
||||
# killed in the middle of generating the snapshot.
|
||||
time.sleep(
|
||||
random.randint(2 * SNAPSHOT_CYCLE_SEC, 3 * SNAPSHOT_CYCLE_SEC))
|
||||
clean_memgraph()
|
||||
|
||||
# Final check
|
||||
run_memgraph()
|
||||
session = SessionCache.argument_session(args)
|
||||
num_nodes_db = \
|
||||
session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"]
|
||||
num_nodes = sum(data.values())
|
||||
# Check that more than 99.9% of data is recoverable.
|
||||
# NOTE: default WAL flush interval is 1ms.
|
||||
# Once the full sync durability mode is introduced,
|
||||
# this test has to be extended.
|
||||
assert num_nodes_db > 0.999 * num_nodes, \
|
||||
"Memgraph lost more than 0.001 of data!"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user