Implement manage script for card fraud demo
Summary: Script provides helper actions for demo: 1) Copy memgraph to cluster 2) Copy durability directories to cluster 3) Start master and workers 4) Stop memgraph cluster 5) Clean memgraph directories 6) Clean durability directories Cluster config should be provided in separate json file. Reviewers: buda, mtomic Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1180
This commit is contained in:
parent
c1e4676316
commit
b99436910f
@ -2,18 +2,19 @@
|
||||
"indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"],
|
||||
"nodes" : [
|
||||
{
|
||||
"count_per_worker" : 10,
|
||||
"count_per_worker" : 1250000,
|
||||
"label" : "Card"
|
||||
},
|
||||
{
|
||||
"count_per_worker" : 10,
|
||||
"count_per_worker" : 1250000,
|
||||
"label" : "Pos"
|
||||
},
|
||||
{
|
||||
"count_per_worker" : 20,
|
||||
"count_per_worker" : 2500000,
|
||||
"label" : "Transaction"
|
||||
}
|
||||
],
|
||||
"compromised_pos_probability" : 0.2,
|
||||
"fraud_reported_probability" : 0.1,
|
||||
"hop_probability" : 0.1
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <experimental/filesystem>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
@ -17,8 +18,8 @@
|
||||
|
||||
DEFINE_string(num_workers, "1",
|
||||
"Number of distributed workers (including master)");
|
||||
DEFINE_string(durability_directory_prefix, "tmp",
|
||||
"Prefix for durability directories");
|
||||
DEFINE_string(dir, "tmp",
|
||||
"Directory for storing workers durability directories.");
|
||||
DEFINE_string(config, "", "Path to config JSON file");
|
||||
|
||||
/**
|
||||
@ -41,6 +42,7 @@ DEFINE_string(config, "", "Path to config JSON file");
|
||||
* }
|
||||
* ],
|
||||
* "compromised_pos_probability" : 0.2,
|
||||
* "fraud_reported_probability" : 0.1,
|
||||
* "hop_percentage" : 0.1
|
||||
*}
|
||||
*/
|
||||
@ -359,6 +361,7 @@ int main(int argc, char **argv) {
|
||||
const fs::path kSnapshotDir = "snapshots";
|
||||
|
||||
double compromised_pos_probability = config["compromised_pos_probability"];
|
||||
double fraud_reported_probability = config["fraud_reported_probability"];
|
||||
double hop_probability = config["hop_probability"];
|
||||
|
||||
LOG(INFO) << "Creating snapshots with config: ";
|
||||
@ -450,8 +453,8 @@ int main(int argc, char **argv) {
|
||||
// Write snapshot files.
|
||||
LOG(INFO) << "Writing snapshots...";
|
||||
for (int worker_id = 0; worker_id < num_workers; ++worker_id) {
|
||||
const fs::path durability_dir = FLAGS_durability_directory_prefix +
|
||||
"_worker_" + std::to_string(worker_id);
|
||||
const fs::path durability_dir =
|
||||
FLAGS_dir / fs::path("worker_" + std::to_string(worker_id));
|
||||
if (!durability::EnsureDir(durability_dir / kSnapshotDir)) {
|
||||
LOG(ERROR) << "Unable to create durability directory!";
|
||||
exit(0);
|
||||
@ -501,11 +504,9 @@ int main(int argc, char **argv) {
|
||||
DCHECK(out_edges[1].type == GraphState::Edge::Type::AT);
|
||||
DCHECK(in_edges.size() == 0);
|
||||
bool fraud_reported = false;
|
||||
if (state.IsCompromisedCard(out_edges[1].to.first,
|
||||
out_edges[1].to.second) &&
|
||||
state.IsCompromisedPos(out_edges[0].to.first,
|
||||
if (state.IsCompromisedCard(out_edges[0].to.first,
|
||||
out_edges[0].to.second)) {
|
||||
fraud_reported = value_generator.Bernoulli(0.1);
|
||||
fraud_reported = value_generator.Bernoulli(fraud_reported_probability);
|
||||
}
|
||||
auto props = value_generator.MakeTxProperties(fraud_reported, worker_id);
|
||||
writer.WriteNode(tx_id, std::vector<std::string>{kLabelTransaction},
|
||||
|
338
tools/manage_distributed_card_fraud
Executable file
338
tools/manage_distributed_card_fraud
Executable file
@ -0,0 +1,338 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from argparse import ArgumentParser
|
||||
from collections import namedtuple
|
||||
from pathlib import Path
|
||||
|
||||
"""
|
||||
Script provides helper actions for strata card fraud demo:
|
||||
1) Copy memgraph to cluster
|
||||
2) Copy durability directories to cluster
|
||||
3) Start master and workers
|
||||
4) Stop memgraph cluster
|
||||
5) Clean memgraph directories
|
||||
6) Clean durability directories
|
||||
|
||||
Cluster config should be provided in separate json file.
|
||||
|
||||
Example usage:
|
||||
```./manage_distributed_card_fraud memgraph-copy --config config.json```
|
||||
|
||||
Example json config:
|
||||
{
|
||||
"workload_machines":
|
||||
[
|
||||
{
|
||||
"host" : "distcardfraud2.memgraph.io",
|
||||
"type" : "master",
|
||||
"address" : "10.1.13.5",
|
||||
"port" : 10000,
|
||||
"num_workers" : 4,
|
||||
"ssh_port" : 60022
|
||||
},
|
||||
{
|
||||
"host" : "distcardfraud3.memgraph.io",
|
||||
"type" : "worker",
|
||||
"address" : "10.1.13.6",
|
||||
"port" : 10001,
|
||||
"num_workers" : 2,
|
||||
"ssh_port" : 60022
|
||||
},
|
||||
{
|
||||
"host" : "distcardfraud4.memgraph.io",
|
||||
"type" : "worker",
|
||||
"address" : "10.1.13.7",
|
||||
"port" : 10002,
|
||||
"num_workers" : 2,
|
||||
"ssh_port" : 60022
|
||||
}
|
||||
],
|
||||
"benchmark_machines":["distcardfraud1.memgraph.io"],
|
||||
"remote_user": "memgraph",
|
||||
"ssh_public_key" : "~/.ssh/id_rsa",
|
||||
"memgraph_build_dir" : "~/Development/memgraph/build",
|
||||
"memgraph_remote_dir" : "/home/memgraph/memgraph_remote",
|
||||
"durability_dir" : "~/Development/memgraph/build/tests/manual/test_dir",
|
||||
"durability_remote_dir" : "/home/memgraph/memgraph_remote/durability",
|
||||
"logs_dir" : "~/Development/memgraph/logs",
|
||||
"logs_remote_dir" : "/home/memgraph/memgraph_remote/logs"
|
||||
}
|
||||
"""
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
log = logging.getLogger("RemoteRunner")
|
||||
|
||||
|
||||
def parse_args():
|
||||
"""
|
||||
Parse command line arguments
|
||||
"""
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
actions = [func for func in dir(RemoteRunner)
|
||||
if callable(getattr(RemoteRunner, func))]
|
||||
actions = [action.replace('_', '-') for action in actions
|
||||
if not action.startswith('_')]
|
||||
|
||||
parser.add_argument("action", metavar="action", choices=actions,
|
||||
help=", ".join(actions))
|
||||
parser.add_argument("--config", default="config.json",
|
||||
help="Config for cluster.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
class Machine(namedtuple('Machine', ['host', 'type', 'address',
|
||||
'port', 'num_workers', 'ssh_port'])):
|
||||
__slots__ = () # prevent creation of instance dictionaries
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
assert isinstance(self.port, int), "port must be an integer"
|
||||
assert isinstance(self.num_workers, int), "num_workers must be \
|
||||
an integer"
|
||||
assert isinstance(self.ssh_port, int), "ssh_port must be an integer"
|
||||
|
||||
|
||||
class Config:
|
||||
|
||||
def __init__(self, config_file):
|
||||
data = json.load(open(config_file))
|
||||
self.workload_machines = [Machine(**config)
|
||||
for config in data["workload_machines"]]
|
||||
self.benchmark_machines = data["benchmark_machines"]
|
||||
self.remote_user = data["remote_user"]
|
||||
self.ssh_public_key = data["ssh_public_key"]
|
||||
self.ssh_public_key = str(Path(self.ssh_public_key).expanduser())
|
||||
|
||||
self.memgraph_build_dir = data["memgraph_build_dir"]
|
||||
self.memgraph_build_dir = str(
|
||||
Path(self.memgraph_build_dir).expanduser())
|
||||
self.memgraph_remote_dir = data["memgraph_remote_dir"]
|
||||
|
||||
self.durability_dir = data["durability_dir"]
|
||||
self.durability_dir = str(Path(self.durability_dir).expanduser())
|
||||
self.durability_remote_dir = data["durability_remote_dir"]
|
||||
|
||||
self.logs_dir = data["logs_dir"]
|
||||
self.logs_dir = str(Path(self.logs_dir).expanduser())
|
||||
self.logs_remote_dir = data["logs_remote_dir"]
|
||||
|
||||
log.info("Initializing with config\n{}".format(
|
||||
json.dumps(data, indent=4, sort_keys=True)))
|
||||
|
||||
def master(self):
|
||||
return next(filter(lambda m: m.type == "master",
|
||||
self.workload_machines), None)
|
||||
|
||||
def workers(self):
|
||||
return list(filter(lambda m: m.type == "worker",
|
||||
self.workload_machines))
|
||||
|
||||
|
||||
class RemoteRunner:
|
||||
|
||||
def __init__(self, config):
|
||||
self._config = config
|
||||
self._ssh_args = ["-i", self._config.ssh_public_key]
|
||||
self._scp_args = ["-i", self._config.ssh_public_key]
|
||||
self._ssh_nohost_cmd = ["ssh"] + self._ssh_args + ["-p"]
|
||||
|
||||
def _run_cmd(self, cmd, daemon=False, **kwargs):
|
||||
if "stdout" not in kwargs:
|
||||
kwargs["stdout"] = open("/dev/null", "w")
|
||||
if "stderr" not in kwargs:
|
||||
kwargs["stderr"] = subprocess.PIPE
|
||||
|
||||
if "host" in kwargs:
|
||||
remote_host = kwargs.pop("host", None)
|
||||
if "user" in kwargs:
|
||||
user = kwargs.pop("user", self._config.remote_user)
|
||||
if "ssh_port" in kwargs:
|
||||
ssh_port = kwargs.pop("ssh_port", None)
|
||||
|
||||
if cmd[0] != "scp":
|
||||
assert remote_host is not None, "Remote host not specified"
|
||||
assert ssh_port is not None, "ssh port not specified"
|
||||
where = "{}@{}".format(user, remote_host)
|
||||
cmd = self._ssh_nohost_cmd + [str(ssh_port)] + [where] + cmd
|
||||
|
||||
log.info("Command: {}".format(cmd))
|
||||
|
||||
if not daemon:
|
||||
ret = subprocess.run(cmd, **kwargs)
|
||||
err = ret.stderr.decode("utf-8")
|
||||
if err != "":
|
||||
log.error("Command: {} - ERROR: {}".format(cmd, err))
|
||||
if kwargs["stdout"] == subprocess.PIPE:
|
||||
return (ret.returncode, ret.stdout.decode("utf-8"))
|
||||
return ret.returncode
|
||||
else:
|
||||
pid = subprocess.Popen(cmd, **kwargs)
|
||||
return pid.pid
|
||||
|
||||
def _mkdir(self, dir_name="tmp"):
|
||||
log.info("mkdir {}".format(dir_name))
|
||||
for machine in self._config.workload_machines:
|
||||
log.info("mkdir {} on {}".format(dir_name, machine.host))
|
||||
ret = self._run_cmd(["mkdir", "-p", dir_name],
|
||||
user=self._config.remote_user,
|
||||
host=machine.host, ssh_port=machine.ssh_port)
|
||||
log.info(ret)
|
||||
|
||||
def _listdir(self):
|
||||
# for testing purposes only
|
||||
log.info("listdir")
|
||||
machine = self._config.workload_machines[0]
|
||||
ret = self._run_cmd(["ls", "-la"],
|
||||
user=self._config.remote_user, host=machine.host,
|
||||
stdout=subprocess.PIPE, ssh_port=machine.ssh_port)
|
||||
log.info("\n{}".format(ret[1]))
|
||||
|
||||
def _memgraph_symlink(self, memgraph_version, machine):
|
||||
log.info("memgraph-symlink on {}".format(machine.host))
|
||||
# create or update a symlink if already exists
|
||||
ret = self._run_cmd(["ln", "-sf",
|
||||
os.path.join(self._config.memgraph_remote_dir,
|
||||
memgraph_version),
|
||||
os.path.join(self._config.memgraph_remote_dir,
|
||||
"memgraph")],
|
||||
user=self._config.remote_user,
|
||||
host=machine.host, ssh_port=machine.ssh_port)
|
||||
log.info(ret)
|
||||
|
||||
def memgraph_copy(self):
|
||||
log.info("memgraph-copy")
|
||||
self._mkdir(self._config.memgraph_remote_dir)
|
||||
self._mkdir(self._config.durability_remote_dir)
|
||||
self._mkdir(self._config.logs_remote_dir)
|
||||
memgraph_binary = os.path.realpath(
|
||||
self._config.memgraph_build_dir + "/memgraph")
|
||||
for machine in self._config.workload_machines:
|
||||
log.info("memgraph-copy on {}".format(machine.host))
|
||||
args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)]
|
||||
args += [memgraph_binary,
|
||||
self._config.remote_user + "@" + machine.host + ":" +
|
||||
self._config.memgraph_remote_dir]
|
||||
ret = self._run_cmd(args)
|
||||
log.info(ret)
|
||||
self._memgraph_symlink(os.path.basename(memgraph_binary), machine)
|
||||
|
||||
def memgraph_clean(self):
|
||||
log.info("memgraph-clean")
|
||||
for machine in self._config.workload_machines:
|
||||
log.info("memgraph-clean on {}".format(machine.host))
|
||||
ret = self._run_cmd(["rm", "-rf",
|
||||
self._config.memgraph_remote_dir],
|
||||
user=self._config.remote_user,
|
||||
host=machine.host, ssh_port=machine.ssh_port)
|
||||
log.info(ret)
|
||||
|
||||
def durability_copy(self):
|
||||
log.info("durability-copy")
|
||||
self._mkdir(self._config.durability_remote_dir)
|
||||
for i, machine in enumerate(self._config.workload_machines):
|
||||
log.info("durability-copy on {}".format(machine.host))
|
||||
# copy durability dir
|
||||
args = ["scp", "-r"] + self._scp_args + ["-P",
|
||||
str(machine.ssh_port)]
|
||||
args += [os.path.join(self._config.durability_dir,
|
||||
"worker_{}/snapshots".format(i)),
|
||||
self._config.remote_user + "@" + machine.host + ":" +
|
||||
self._config.durability_remote_dir]
|
||||
ret = self._run_cmd(args)
|
||||
log.info(ret)
|
||||
|
||||
def durability_clean(self):
|
||||
log.info("durability-clean")
|
||||
for machine in self._config.workload_machines:
|
||||
log.info("durability-clean on {}".format(machine.host))
|
||||
ret = self._run_cmd(["rm", "-rf",
|
||||
self._config.durability_remote_dir],
|
||||
user=self._config.remote_user,
|
||||
host=machine.host, ssh_port=machine.ssh_port)
|
||||
log.info(ret)
|
||||
|
||||
def memgraph_master(self):
|
||||
log.info("memgraph-master")
|
||||
machine = self._config.master()
|
||||
assert machine is not None, "Unable to fetch master config"
|
||||
dir_cmd = ["cd", self._config.memgraph_remote_dir]
|
||||
memgraph_cmd = ["nohup", "./memgraph",
|
||||
"--master",
|
||||
"--master-host", machine.address,
|
||||
"--master-port", str(machine.port),
|
||||
"--durability-directory={}".format(
|
||||
self._config.durability_remote_dir),
|
||||
"--db-recover-on-startup=true",
|
||||
"--num-workers", str(machine.num_workers),
|
||||
"--log-file",
|
||||
os.path.join(self._config.logs_remote_dir,
|
||||
"log_worker_0")]
|
||||
cmd = dir_cmd + ["&&"] + memgraph_cmd
|
||||
ret = self._run_cmd(cmd, daemon=True,
|
||||
user=self._config.remote_user, host=machine.host,
|
||||
ssh_port=machine.ssh_port)
|
||||
log.info(ret)
|
||||
|
||||
def memgraph_workers(self):
|
||||
log.info("memgraph-workers")
|
||||
master = self._config.master()
|
||||
assert master is not None, "Unable to fetch master config"
|
||||
workers = self._config.workers()
|
||||
assert workers, "Unable to fetch workers config"
|
||||
for i, machine in enumerate(workers):
|
||||
dir_cmd = ["cd", self._config.memgraph_remote_dir]
|
||||
worker_cmd = ["nohup", "./memgraph",
|
||||
"--worker",
|
||||
"--master-host", master.address,
|
||||
"--master-port", str(master.port),
|
||||
"--worker-id", str(i + 1),
|
||||
"--worker-port", str(machine.port),
|
||||
"--worker-host", machine.address,
|
||||
"--durability-directory={}".format(
|
||||
self._config.durability_remote_dir),
|
||||
"--db-recover-on-startup=true",
|
||||
"--num-workers", str(machine.num_workers),
|
||||
"--log-file",
|
||||
os.path.join(self._config.logs_remote_dir,
|
||||
"log_worker_{}".format(i + 1))]
|
||||
cmd = dir_cmd + ["&&"] + worker_cmd
|
||||
ret = self._run_cmd(cmd, daemon=True,
|
||||
user=self._config.remote_user,
|
||||
host=machine.host, ssh_port=machine.ssh_port)
|
||||
log.info(ret)
|
||||
|
||||
def memgraph_terminate(self):
|
||||
log.info("memgraph-terminate")
|
||||
# only kill master - master stops workers
|
||||
machine = self._config.master()
|
||||
ret = self._run_cmd(["kill", "-15", "$(pgrep memgraph)"],
|
||||
user=self._config.remote_user,
|
||||
host=machine.host, ssh_port=machine.ssh_port)
|
||||
log.info(ret)
|
||||
|
||||
def collect_logs(self):
|
||||
log.info("collect-logs")
|
||||
for i, machine in enumerate(self._config.workload_machines):
|
||||
log.info("collect-log from {}".format(machine.host))
|
||||
args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)]
|
||||
args += [self._config.remote_user + "@" + machine.host + ":" +
|
||||
os.path.join(self._config.logs_remote_dir,
|
||||
"log_worker_{}".format(i)),
|
||||
self._config.logs_dir]
|
||||
ret = self._run_cmd(args)
|
||||
log.info(ret)
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
config = Config(args.config)
|
||||
action = args.action.replace("-", "_")
|
||||
runner = RemoteRunner(config)
|
||||
action = getattr(runner, action)
|
||||
action()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user