diff --git a/apollo_build.yaml b/apollo_build.yaml
index 20e8bcbff..df5759efc 100644
--- a/apollo_build.yaml
+++ b/apollo_build.yaml
@@ -24,7 +24,7 @@
cd build_release
cmake -DCMAKE_BUILD_TYPE=release ..
- TIMEOUT=1000 make -j$THREADS memgraph memgraph__macro_benchmark memgraph__stress
+ TIMEOUT=1000 make -j$THREADS memgraph memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot
cd ../../parent
@@ -40,6 +40,10 @@
cd ../../memgraph/tools
TIMEOUT=300 ./setup
+ # Generate distributed card fraud dataset
+ cd ../tests/distributed/card_fraud
+ ./generate_dataset.sh
+
- name: Debug build
project: ^mg-master-debug$
diff --git a/tests/distributed/card_fraud/.gitignore b/tests/distributed/card_fraud/.gitignore
new file mode 100644
index 000000000..6a72bbd8f
--- /dev/null
+++ b/tests/distributed/card_fraud/.gitignore
@@ -0,0 +1,2 @@
+output
+snapshots
diff --git a/tests/distributed/card_fraud/apollo_runs.py b/tests/distributed/card_fraud/apollo_runs.py
new file mode 100755
index 000000000..557ab9025
--- /dev/null
+++ b/tests/distributed/card_fraud/apollo_runs.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python3
+import json
+import os
+import re
+import subprocess
+
+from card_fraud import NUM_MACHINES, BINARIES
+
+# paths
+SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
+WORKSPACE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
+OUTPUT_DIR_REL = os.path.join(os.path.relpath(SCRIPT_DIR, WORKSPACE_DIR), "output")
+
+# generate runs
+runs = []
+
+binaries = list(map(lambda x: os.path.join("..", "..", "build_release", x), BINARIES))
+
+for i in range(NUM_MACHINES):
+ name = "master" if i == 0 else "worker" + str(i)
+ additional = ["master.py"] if i == 0 else []
+ outfile_paths = ["\\./" + OUTPUT_DIR_REL + "/.+"] if i == 0 else []
+ if i == 0:
+ cmd = "master.py --machines-num {0} --test-suite card_fraud " \
+ "--test card_fraud".format(NUM_MACHINES)
+ else:
+ cmd = "jail_service.py"
+ runs.append({
+ "name": "distributed__card_fraud__" + name,
+ "cd": "..",
+ "supervisor": cmd,
+ "infiles": binaries + [
+ "common.py",
+ "jail_service.py",
+ "card_fraud/card_fraud.py",
+ "card_fraud/snapshots/worker_" + str(i),
+ "../../libs/postgresql/lib",
+ ] + additional,
+ "outfile_paths": outfile_paths,
+ "parallel_run": "distributed__card_fraud",
+ "slave_group": "remote_4c32g",
+ "enable_network": True,
+ })
+
+print(json.dumps(runs, indent=4, sort_keys=True))
diff --git a/tests/distributed/card_fraud/card_fraud.py b/tests/distributed/card_fraud/card_fraud.py
new file mode 100644
index 000000000..01e23abf7
--- /dev/null
+++ b/tests/distributed/card_fraud/card_fraud.py
@@ -0,0 +1,222 @@
+import json
+import os
+import time
+
+# to change the size of the cluster, just change this parameter
+NUM_MACHINES = 3
+
+# test setup
+SCENARIOS = ["point_lookup", "create_tx_without_edge"]
+DURATION = 300
+WORKERS = 6
+
+# constants
+SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
+MEMGRAPH_BINARY = "memgraph"
+CLIENT_BINARY = "tests/macro_benchmark/card_fraud_client"
+BINARIES = [MEMGRAPH_BINARY, CLIENT_BINARY]
+
+# wrappers
+class WorkerWrapper:
+ def __init__(self, address, worker):
+ self._address = address
+ self._worker = worker
+ self._tid = worker.get_jail()
+
+ def get_address(self):
+ return self._address
+
+ def __getattr__(self, name):
+ if name in ["allocate_file", "read_file", "store_label"]:
+ return getattr(self._worker, name)
+ def func(*args, **kwargs):
+ args = [self._tid] + list(args)
+ return getattr(self._worker, name)(*args, **kwargs)
+ return func
+
+class MgCluster:
+ def __init__(self, machine_ids, workers):
+ # create wrappers
+ self._master = WorkerWrapper(os.environ[machine_ids[0]],
+ workers[machine_ids[0]])
+ self._workers = []
+ for machine_id in machine_ids[1:]:
+ self._workers.append(WorkerWrapper(os.environ[machine_id],
+ workers[machine_id]))
+
+ def start(self):
+ # start memgraph master
+ self._master.start(MEMGRAPH_BINARY, [
+ "--master",
+ "--master-host", self._master.get_address(),
+ "--master-port", "10000",
+ "--durability-directory", os.path.join(SCRIPT_DIR, "snapshots",
+ "worker_0"),
+ "--db-recover-on-startup",
+ "--query-vertex-count-to-expand-existing", "-1",
+ "--num-workers", str(WORKERS),
+ "--rpc-num-workers", str(WORKERS),
+ ])
+
+ # sleep to allow the master to startup
+ time.sleep(5)
+
+ # start memgraph workers
+ for i, worker in enumerate(self._workers, start=1):
+ worker.start(MEMGRAPH_BINARY, [
+ "--worker", "--worker-id", str(i),
+ "--worker-host", worker.get_address(),
+ "--worker-port", str(10000 + i),
+ "--master-host", self._master.get_address(),
+ "--master-port", "10000",
+ "--durability-directory", os.path.join(SCRIPT_DIR, "snapshots",
+ "worker_" + str(i)),
+ "--db-recover-on-startup",
+ "--num-workers", str(WORKERS),
+ "--rpc-num-workers", str(WORKERS),
+ ])
+
+ # sleep to allow the workers to startup
+ time.sleep(5)
+
+ # store initial usage
+ self._usage_start = [self._master.get_usage()]
+ for worker in self._workers:
+ self._usage_start.append(worker.get_usage())
+ self._usage_start_time = time.time()
+
+ def get_master_address(self):
+ return self._master.get_address()
+
+ def check_status(self):
+ if not self._master.check_status():
+ return False
+ for worker in self._workers:
+ if not worker.check_status():
+ return False
+ return True
+
+ def stop(self):
+ # store final usage
+ self._usage_stop = [self._master.get_usage()]
+ for worker in self._workers:
+ self._usage_stop.append(worker.get_usage())
+ self._usage_stop_time = time.time()
+
+ # stop the master
+ self._master.stop()
+
+ # wait to allow the master and workers to die
+ time.sleep(5)
+
+ # stop the workers
+ for worker in self._workers:
+ worker.stop()
+
+ # wait to allow the workers to die
+ time.sleep(5)
+
+ def get_usage(self):
+ ret = []
+ tdelta = self._usage_stop_time - self._usage_start_time
+ for val_start, val_stop in zip(self._usage_start, self._usage_stop):
+ data = {
+ "cpu": (val_stop["cpu"] - val_start["cpu"]) / tdelta,
+ "memory": val_stop["max_memory"] / 1024,
+ "threads": val_stop["max_threads"],
+ "network": {}
+ }
+ net_start = val_start["network"]["eth0"]
+ net_stop = val_stop["network"]["eth0"]
+ for i in ["bytes", "packets"]:
+ data["network"][i] = {}
+ for j in ["rx", "tx"]:
+ data["network"][i][j] = (net_stop[i][j] -
+ net_start[i][j]) / tdelta
+ ret.append(data)
+ return ret
+
+ def store_label(self, label):
+ self._master.store_label(label)
+ for worker in self._workers:
+ worker.store_label(label)
+
+def write_scenario_summary(scenario, throughput, usage, output):
+ output.write("Scenario **{}** throughput !!{:.2f}!! queries/s.\n\n".format(
+ scenario, throughput))
+ headers = ["Memgraph", "CPU", "Max memory", "Max threads",
+ "Network RX", "Network TX"]
+ output.write("
\n")
+ for header in headers:
+ output.write("{} | ".format(header))
+ output.write("
\n")
+ for i, current in enumerate(usage):
+ name = "master" if i == 0 else "worker" + str(i)
+ output.write("{} | ".format(name))
+ for key, unit in [("cpu", "s/s"), ("memory", "MiB"), ("threads", "")]:
+ fmt = ".2f" if key != "threads" else ""
+ output.write(("{:" + fmt + "} {} | ").format(
+ current[key], unit).strip())
+ for key in ["rx", "tx"]:
+ output.write("{:.2f} packets/s | ".format(
+ current["network"]["packets"][key]))
+ output.write("
\n")
+ output.write("
\n\n")
+
+# main test function
+def run(machine_ids, workers):
+ # create output directory
+ output_dir = os.path.join(SCRIPT_DIR, "output")
+ if not os.path.exists(output_dir):
+ os.mkdir(output_dir)
+
+ # create memgraph cluster and client
+ mg_cluster = MgCluster(machine_ids, workers)
+ mg_client = WorkerWrapper(os.environ[machine_ids[0]],
+ workers[machine_ids[0]])
+
+ # execute the tests
+ stats = {}
+ for scenario in SCENARIOS:
+ output_file = os.path.join(output_dir, scenario + ".json")
+
+ print("Starting memgraph cluster")
+ mg_cluster.store_label("Start: cluster")
+ mg_cluster.start()
+
+ print("Starting client scenario:", scenario)
+ mg_cluster.store_label("Start: " + scenario)
+ mg_client.start(CLIENT_BINARY, [
+ "--address", mg_cluster.get_master_address(),
+ "--group", "card_fraud",
+ "--scenario", scenario,
+ "--duration", str(DURATION),
+ "--num-workers", str(WORKERS),
+ "--output", output_file,
+ ])
+
+ # wait for the client to terminate and check the cluster status
+ while mg_client.check_status():
+ assert mg_cluster.check_status(), "The memgraph cluster has died!"
+ time.sleep(2)
+
+ # stop everything
+ mg_client.wait()
+ mg_cluster.store_label("Stop: " + scenario)
+ mg_cluster.stop()
+ mg_cluster.store_label("Stop: cluster")
+
+ # process the stats
+ data = json.loads(list(filter(lambda x: x.strip(),
+ open(output_file).read().split("\n")))[-1])
+ throughput = data["num_executed_queries"] / data["elapsed_time"]
+ usage = mg_cluster.get_usage()
+ stats[scenario] = (throughput, usage)
+
+ # dump the stats
+ stats_file = open(os.path.join(output_dir, ".card_fraud_summary"), "w")
+ stats_file.write("==== Distributed card fraud summary: ====\n\n")
+ for scenario in SCENARIOS:
+ throughput, usage = stats[scenario]
+ write_scenario_summary(scenario, throughput, usage, stats_file)
+ stats_file.close()
diff --git a/tests/distributed/card_fraud/config.json b/tests/distributed/card_fraud/config.json
new file mode 100644
index 000000000..62a3b5ca2
--- /dev/null
+++ b/tests/distributed/card_fraud/config.json
@@ -0,0 +1,20 @@
+{
+ "indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"],
+ "nodes" : [
+ {
+ "count_per_worker" : 10000,
+ "label" : "Card"
+ },
+ {
+ "count_per_worker" : 1000,
+ "label" : "Pos"
+ },
+ {
+ "count_per_worker" : 50000,
+ "label" : "Transaction"
+ }
+ ],
+ "compromised_pos_probability" : 0.2,
+ "fraud_reported_probability" : 0.1,
+ "hop_probability" : 0.1
+}
diff --git a/tests/distributed/card_fraud/generate_dataset.sh b/tests/distributed/card_fraud/generate_dataset.sh
new file mode 100755
index 000000000..b218050c1
--- /dev/null
+++ b/tests/distributed/card_fraud/generate_dataset.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+cd $script_dir
+
+output_dir=snapshots
+
+if [ -d $output_dir ]; then
+ rm -rf $output_dir
+fi
+
+NUM_MACHINES="$( cat card_fraud.py | grep -m1 "NUM_MACHINES" | tail -c 2 )"
+
+../../../build_release/tests/manual/card_fraud_generate_snapshot --config config.json --num-workers $NUM_MACHINES --dir $output_dir
diff --git a/tests/distributed/jail_service.py b/tests/distributed/jail_service.py
index 50f74006c..20e0d40c5 100755
--- a/tests/distributed/jail_service.py
+++ b/tests/distributed/jail_service.py
@@ -38,6 +38,12 @@ class JailService:
self._generated_filenames = []
self.tempdir = tempfile.TemporaryDirectory()
+ def _get_proc(self, tid):
+ if tid not in self.processes:
+ raise Exception(
+ "Binary with tid {tid} does not exist".format(tid=tid))
+ return self.processes[tid]
+
def start(self, tid, binary_name, binary_args=None):
self.log.info("Starting Binary: {binary}".format(binary=binary_name))
self.log.info("With args: {args}".format(args=binary_args))
@@ -50,7 +56,7 @@ class JailService:
binary = get_absolute_path(binary_name, "build_release")
# fetch process
- proc = self.processes[tid]
+ proc = self._get_proc(tid)
# start binary
proc.run(binary, args=binary_args, timeout=600)
@@ -59,13 +65,29 @@ class JailService:
binary=binary_name, tid=proc._tid)
self.log.info(msg)
+ def check_status(self, tid):
+ proc = self._get_proc(tid)
+ status = proc.get_status()
+ if status is None: return True
+ assert status == 0, "The binary exited with a non-zero status!"
+ return False
+
+ def get_usage(self, tid):
+ usage = self._get_proc(tid).get_usage()
+ usage.update({"network": jail.get_network_usage()})
+ return usage
+
+ def wait(self, tid):
+ proc = self._get_proc(tid)
+ proc.wait()
+
def stop(self, tid):
self.log.info("Stopping binary with tid {tid}".format(tid=tid))
- if tid not in self.processes:
- raise Exception(
- "Binary with tid {tid} does not exist".format(tid=tid))
- proc = self.processes[tid]
- proc.send_signal(jail.SIGTERM)
+ proc = self._get_proc(tid)
+ try:
+ proc.send_signal(jail.SIGTERM)
+ except Exception:
+ pass
proc.wait()
self.log.info("Binary with tid {tid} stopped".format(tid=tid))
@@ -88,6 +110,13 @@ class JailService:
self.processes[proc._tid] = proc
return proc._tid
+ def store_label(self, label):
+ jail.store_label(label)
+
+ def shutdown(self):
+ self.log.info("Stopping Jail Service")
+ os._exit(0)
+
def main():
# set port dynamically
diff --git a/tests/distributed/local_runner b/tests/distributed/local_runner
index 4d43f832d..922a73a05 100755
--- a/tests/distributed/local_runner
+++ b/tests/distributed/local_runner
@@ -38,6 +38,7 @@ done
quit()
{
# Stop workers
+ sleep 1
for i in `seq 2 $NUM_MACHINES`;
do
kill ${pids[$i]}
diff --git a/tests/distributed/master.py b/tests/distributed/master.py
index 817eb89f8..fd0b830c4 100755
--- a/tests/distributed/master.py
+++ b/tests/distributed/master.py
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
+import atexit
import importlib
import logging
import os
@@ -35,6 +36,7 @@ def main(args):
workers = {}
machine_ids = []
machines_num = int(args.machines_num)
+
# initialize workers
for i in range(machines_num):
id = i + 1
@@ -54,6 +56,15 @@ def main(args):
workers[machine_id] = worker
+ # cleanup at exit
+ @atexit.register
+ def cleanup():
+ for machine_id in machine_ids[1:]:
+ try:
+ workers[machine_id].shutdown()
+ except ConnectionRefusedError:
+ pass
+
# run test
test = importlib.import_module(
"{suite}.{test}".format(suite=args.test_suite, test=args.test))
diff --git a/tests/macro_benchmark/jail_faker.py b/tests/macro_benchmark/jail_faker.py
index 7dd528a3b..f536785b3 100644
--- a/tests/macro_benchmark/jail_faker.py
+++ b/tests/macro_benchmark/jail_faker.py
@@ -1,6 +1,6 @@
#!/usr/bin/python3
-
import atexit
+import copy
import json
import os
import resource
@@ -13,10 +13,6 @@ import uuid
from signal import *
-SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
-STORAGE_DIR = os.path.join(SCRIPT_DIR, ".storage")
-
-
class ProcessException(Exception):
pass
@@ -63,14 +59,14 @@ class Process:
def run_and_wait(self, *args, **kwargs):
check = kwargs.pop("check", True)
self.run(*args, **kwargs)
- return self.wait()
+ return self.wait(check)
def wait(self, check = True):
if self._proc == None:
raise ProcessException
self._proc.wait()
if check and self._proc.returncode != 0:
- raise ProcessException
+ raise ProcessException("Command returned non-zero!")
return self._proc.returncode
def get_status(self):
@@ -87,7 +83,7 @@ class Process:
def get_usage(self):
if self._proc == None:
raise ProcessException
- return self._usage
+ return copy.deepcopy(self._usage)
# this is implemented only in the real API
def set_cpus(self, cpus, hyper=True):
@@ -136,9 +132,11 @@ class Process:
except:
return
# for a description of these fields see: man proc; man times
- cpu_time = sum(map(lambda x: int(x) / self._ticks_per_sec,
- data_stat[13:17]))
- self._set_usage(cpu_time, "cpu", only_value = True)
+ utime, stime, cutime, cstime = map(
+ lambda x: int(x) / self._ticks_per_sec, data_stat[13:17])
+ self._set_usage(utime + stime + cutime + cstime, "cpu", only_value=True)
+ self._set_usage(utime + cutime, "cpu_user", only_value=True)
+ self._set_usage(stime + cstime, "cpu_sys", only_value=True)
self._set_usage(int(data_stat[19]), "threads")
mem_vm, mem_res, mem_shr = map(
lambda x: int(x) * self._page_size // 1024, data_statm[:3])
@@ -161,7 +159,6 @@ class Process:
PROCESSES_NUM = 8
_processes = [Process(i) for i in range(1, PROCESSES_NUM + 1)]
_last_process = 0
-_thread_run = True
def _usage_updater():
while True:
@@ -172,20 +169,12 @@ def _usage_updater():
_thread = threading.Thread(target=_usage_updater, daemon=True)
_thread.start()
-if not os.path.exists(STORAGE_DIR):
- os.mkdir(STORAGE_DIR)
-
-_storage_name = os.path.join(
- STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json")
-_storage_file = open(_storage_name, "w")
-
@atexit.register
def cleanup():
for proc in _processes:
if proc._proc == None: continue
proc.send_signal(SIGKILL)
proc.get_status()
- _storage_file.close()
# end of private methods ------------------------------------------------------
@@ -230,11 +219,25 @@ def get_host_info():
return {"cpus": cpus, "memory": memory, "hyperthreading": hyper,
"threads": threads}
+# placeholder function that stores a label in the real API
+def store_label(label):
+ if type(label) != str:
+ raise Exception("Label must be a string!")
+
+# this function is deprecated
def store_data(data):
- if not type(data) == dict:
- raise StorageException("Data must be a dictionary!")
- for i in ["unit", "type", "value"]:
- if not i in data:
- raise StorageException("Field '{}' missing in data!".format(i))
- data["timestamp"] = time.time()
- _storage_file.write(json.dumps(data) + "\n")
+ pass
+
+# placeholder function that returns real data in the real API
+def get_network_usage():
+ usage = {
+ "lo": {
+ "bytes": {"rx": 0, "tx": 0},
+ "packets": {"rx": 0, "tx": 0}
+ },
+ "eth0": {
+ "bytes": {"rx": 0, "tx": 0},
+ "packets": {"rx": 0, "tx": 0}
+ }
+ }
+ return usage