diff --git a/tests/distributed/card_fraud/card_fraud.py b/tests/distributed/card_fraud/card_fraud.py index 20a540c07..8d7b71193 100644 --- a/tests/distributed/card_fraud/card_fraud.py +++ b/tests/distributed/card_fraud/card_fraud.py @@ -1,6 +1,7 @@ import json import os import time +import subprocess # to change the size of the cluster, just change this parameter NUM_MACHINES = 3 @@ -16,6 +17,18 @@ MEMGRAPH_BINARY = "memgraph_distributed" CLIENT_BINARY = "tests/macro_benchmark/card_fraud_client" BINARIES = [MEMGRAPH_BINARY, CLIENT_BINARY] +# helpers +def wait_for_server(address, port, delay=0.2): + cmd = ["nc", "-z", "-w", "1", address, str(port)] + count = 0 + while subprocess.call(cmd) != 0: + time.sleep(0.01) + if count > 20 / 0.01: + raise Exception("Could not wait for server {}:{} to " + "startup!".format(address, port)) + count += 1 + time.sleep(delay) + # wrappers class WorkerWrapper: def __init__(self, address, worker): @@ -60,8 +73,8 @@ class MgCluster: "--recovering-cluster-size", str(len(self._workers) + 1) ]) - # sleep to allow the master to startup - time.sleep(5) + # wait for the master to finish startup + wait_for_server(self._master.get_address(), 10000) # start memgraph workers for i, worker in enumerate(self._workers, start=1): @@ -79,8 +92,12 @@ class MgCluster: "--rpc-num-server-workers", str(WORKERS), ]) - # sleep to allow the workers to startup - time.sleep(15) + # wait for the workers to finish startup + for i, worker in enumerate(self._workers, start=1): + wait_for_server(worker.get_address(), 10000 + i) + + # wait for the Bolt server to startup + wait_for_server(self._master.get_address(), 7687) # store initial usage self._usage_start = [self._master.get_usage()] diff --git a/tests/distributed/local_runner b/tests/distributed/local_runner index 922a73a05..5aac33a3b 100755 --- a/tests/distributed/local_runner +++ b/tests/distributed/local_runner @@ -41,7 +41,7 @@ quit() sleep 1 for i in `seq 2 $NUM_MACHINES`; do - kill ${pids[$i]} + kill ${pids[$i]} || true done }