Add manual .py test asserting forward progress across reconnects (#408)
This commit is contained in:
parent
285b409927
commit
259cba5d43
127
tests/manual/test_forward_client_progress.py
Executable file
127
tests/manual/test_forward_client_progress.py
Executable file
@ -0,0 +1,127 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
This script can be pointed at a memgraph server that
|
||||
you wish to assert "forward progress" for concurrent
|
||||
clients on, for instance, for testing replication or
|
||||
durability in the face of machine failures.
|
||||
|
||||
Of particular note are the arguments:
|
||||
--host the first connection is made to this host:port
|
||||
--port
|
||||
--reconnect-host after any disconnects, reconnection is attempted to reconnect-host:reconnect-port
|
||||
--reconnect-port
|
||||
|
||||
For any testing, it is likely that you will want to set the server argument:
|
||||
--storage-wal-file-flush-every-n-tx=1
|
||||
which will ensure that the client will receive no success responses unless the
|
||||
data is fsynced on disk.
|
||||
|
||||
For example, this will let you keep running a server container from the terminal in
|
||||
a loop that is fairly kill friendly, also mounting an external directory into
|
||||
/var/lib/memgraph to retain storage files across restarts:
|
||||
|
||||
terminal 1:
|
||||
run this script
|
||||
terminal 2: (server running in a restart loop if it gets killed)
|
||||
while true; do docker run -v some/local/storage/directory:/var/lib/memgraph -it -p 7687:7687 -p 3000:3000 memgraph/memgraph --storage-wal-file-flush-every-n-tx=1; done
|
||||
terminal 3: (killer that will restart it every few seconds)
|
||||
while true; do sleep `shuf -i 1-5 -n1`; pkill 'docker' -u `id -u`; done
|
||||
|
||||
If you are testing with replication, it makes the most sense
|
||||
to configure SYNC replication, and to set the fall-back
|
||||
reconnect-host and reconnect-port to the replica that will
|
||||
take over after you kill main.
|
||||
|
||||
If you set the --ops argument to 0, it will run forever,
|
||||
which may be useful for long-running correctness tests.
|
||||
|
||||
It works by having each concurrent thread (--concurrency argument)
|
||||
create a unique vertex that has a "counter" attribute. This counter
|
||||
attribute is expected to always make forward progress. The client
|
||||
remembers the last value that it wrote, and it asserts that if it
|
||||
ever receives a successful response for a write that it does, that
|
||||
this counter has advanced by one.
|
||||
"""
|
||||
|
||||
from os import kill
|
||||
from multiprocessing import Process, Event
|
||||
from uuid import uuid4
|
||||
from time import sleep
|
||||
|
||||
import argparse
|
||||
import mgclient
|
||||
|
||||
shutdown = Event()
|
||||
|
||||
|
||||
def forward_progress(args):
|
||||
conn = mgclient.connect(host=args.host, port=args.port)
|
||||
conn.autocommit = True
|
||||
cursor = conn.cursor()
|
||||
|
||||
id = str(uuid4())
|
||||
|
||||
cursor.execute("CREATE (v:Vsn { id: $id, counter: 0 }) RETURN v;", {"id": id})
|
||||
cursor.fetchall()
|
||||
|
||||
counter = 0
|
||||
|
||||
for i in range(args.ops):
|
||||
if shutdown.is_set():
|
||||
return
|
||||
try:
|
||||
cursor.execute(
|
||||
"""
|
||||
MATCH (v: Vsn { id: $id, counter: $old })
|
||||
SET v.counter = $new
|
||||
RETURN v
|
||||
""",
|
||||
{"id": id, "old": counter, "new": counter + 1},
|
||||
)
|
||||
ret = cursor.fetchall()
|
||||
if len(ret) != 1:
|
||||
print("expected there to be exactly one Vsn associated with this counter, but we got:", ret)
|
||||
shutdown.set()
|
||||
counter += 1
|
||||
continue
|
||||
except mgclient.DatabaseError as e:
|
||||
if not "failed to receive" in str(e):
|
||||
print("encountered unexpected exception:", str(e))
|
||||
shutdown.set()
|
||||
except mgclient.InterfaceError as e:
|
||||
assert "bad session" in str(e)
|
||||
|
||||
print("waiting for server to come back")
|
||||
|
||||
sleep(1)
|
||||
|
||||
try:
|
||||
conn = mgclient.connect(host=args.reconnect_host, port=args.reconnect_port)
|
||||
conn.autocommit = True
|
||||
cursor = conn.cursor()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
children = []
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-s", "--host", default="127.0.0.1")
|
||||
parser.add_argument("-p", "--port", type=int, default=7687)
|
||||
parser.add_argument("--reconnect-host", default="127.0.0.1")
|
||||
parser.add_argument("--reconnect-port", type=int, default=7687)
|
||||
parser.add_argument("-c", "--concurrency", type=int, default=20)
|
||||
parser.add_argument("-o", "--ops", type=int, default=1000)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.ops == 0:
|
||||
args.ops = 2**64
|
||||
|
||||
for i in range(args.concurrency):
|
||||
child = Process(target=forward_progress, args=(args,))
|
||||
child.start()
|
||||
children.append(child)
|
||||
|
||||
for child in children:
|
||||
child.join()
|
Loading…
Reference in New Issue
Block a user