diff --git a/customers/strata2018/card_fraud_config.json b/customers/strata2018/card_fraud_config.json index d27b84b98..300949fc6 100644 --- a/customers/strata2018/card_fraud_config.json +++ b/customers/strata2018/card_fraud_config.json @@ -2,18 +2,19 @@ "indexes" : ["Card.id", "Pos.id", "Transaction.fraud_reported"], "nodes" : [ { - "count_per_worker" : 10, + "count_per_worker" : 1250000, "label" : "Card" }, { - "count_per_worker" : 10, + "count_per_worker" : 1250000, "label" : "Pos" }, { - "count_per_worker" : 20, + "count_per_worker" : 2500000, "label" : "Transaction" } ], "compromised_pos_probability" : 0.2, + "fraud_reported_probability" : 0.1, "hop_probability" : 0.1 } diff --git a/tests/manual/card_fraud_generate_snapshot.cpp b/tests/manual/card_fraud_generate_snapshot.cpp index 5b0db88ff..053933982 100644 --- a/tests/manual/card_fraud_generate_snapshot.cpp +++ b/tests/manual/card_fraud_generate_snapshot.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -17,8 +18,8 @@ DEFINE_string(num_workers, "1", "Number of distributed workers (including master)"); -DEFINE_string(durability_directory_prefix, "tmp", - "Prefix for durability directories"); +DEFINE_string(dir, "tmp", + "Directory for storing workers durability directories."); DEFINE_string(config, "", "Path to config JSON file"); /** @@ -41,6 +42,7 @@ DEFINE_string(config, "", "Path to config JSON file"); * } * ], * "compromised_pos_probability" : 0.2, + * "fraud_reported_probability" : 0.1, * "hop_percentage" : 0.1 *} */ @@ -359,6 +361,7 @@ int main(int argc, char **argv) { const fs::path kSnapshotDir = "snapshots"; double compromised_pos_probability = config["compromised_pos_probability"]; + double fraud_reported_probability = config["fraud_reported_probability"]; double hop_probability = config["hop_probability"]; LOG(INFO) << "Creating snapshots with config: "; @@ -450,8 +453,8 @@ int main(int argc, char **argv) { // Write snapshot files. LOG(INFO) << "Writing snapshots..."; for (int worker_id = 0; worker_id < num_workers; ++worker_id) { - const fs::path durability_dir = FLAGS_durability_directory_prefix + - "_worker_" + std::to_string(worker_id); + const fs::path durability_dir = + FLAGS_dir / fs::path("worker_" + std::to_string(worker_id)); if (!durability::EnsureDir(durability_dir / kSnapshotDir)) { LOG(ERROR) << "Unable to create durability directory!"; exit(0); @@ -501,11 +504,9 @@ int main(int argc, char **argv) { DCHECK(out_edges[1].type == GraphState::Edge::Type::AT); DCHECK(in_edges.size() == 0); bool fraud_reported = false; - if (state.IsCompromisedCard(out_edges[1].to.first, - out_edges[1].to.second) && - state.IsCompromisedPos(out_edges[0].to.first, - out_edges[0].to.second)) { - fraud_reported = value_generator.Bernoulli(0.1); + if (state.IsCompromisedCard(out_edges[0].to.first, + out_edges[0].to.second)) { + fraud_reported = value_generator.Bernoulli(fraud_reported_probability); } auto props = value_generator.MakeTxProperties(fraud_reported, worker_id); writer.WriteNode(tx_id, std::vector{kLabelTransaction}, diff --git a/tools/manage_distributed_card_fraud b/tools/manage_distributed_card_fraud new file mode 100755 index 000000000..c502dd82f --- /dev/null +++ b/tools/manage_distributed_card_fraud @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +import json +import logging +import os +import subprocess +from argparse import ArgumentParser +from collections import namedtuple +from pathlib import Path + +""" +Script provides helper actions for strata card fraud demo: + 1) Copy memgraph to cluster + 2) Copy durability directories to cluster + 3) Start master and workers + 4) Stop memgraph cluster + 5) Clean memgraph directories + 6) Clean durability directories + +Cluster config should be provided in separate json file. + +Example usage: + ```./manage_distributed_card_fraud memgraph-copy --config config.json``` + +Example json config: +{ + "workload_machines": + [ + { + "host" : "distcardfraud2.memgraph.io", + "type" : "master", + "address" : "10.1.13.5", + "port" : 10000, + "num_workers" : 4, + "ssh_port" : 60022 + }, + { + "host" : "distcardfraud3.memgraph.io", + "type" : "worker", + "address" : "10.1.13.6", + "port" : 10001, + "num_workers" : 2, + "ssh_port" : 60022 + }, + { + "host" : "distcardfraud4.memgraph.io", + "type" : "worker", + "address" : "10.1.13.7", + "port" : 10002, + "num_workers" : 2, + "ssh_port" : 60022 + } + ], + "benchmark_machines":["distcardfraud1.memgraph.io"], + "remote_user": "memgraph", + "ssh_public_key" : "~/.ssh/id_rsa", + "memgraph_build_dir" : "~/Development/memgraph/build", + "memgraph_remote_dir" : "/home/memgraph/memgraph_remote", + "durability_dir" : "~/Development/memgraph/build/tests/manual/test_dir", + "durability_remote_dir" : "/home/memgraph/memgraph_remote/durability", + "logs_dir" : "~/Development/memgraph/logs", + "logs_remote_dir" : "/home/memgraph/memgraph_remote/logs" +} +""" + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("RemoteRunner") + + +def parse_args(): + """ + Parse command line arguments + """ + parser = ArgumentParser(description=__doc__) + actions = [func for func in dir(RemoteRunner) + if callable(getattr(RemoteRunner, func))] + actions = [action.replace('_', '-') for action in actions + if not action.startswith('_')] + + parser.add_argument("action", metavar="action", choices=actions, + help=", ".join(actions)) + parser.add_argument("--config", default="config.json", + help="Config for cluster.") + return parser.parse_args() + + +class Machine(namedtuple('Machine', ['host', 'type', 'address', + 'port', 'num_workers', 'ssh_port'])): + __slots__ = () # prevent creation of instance dictionaries + + def __init__(self, **kwargs): + assert isinstance(self.port, int), "port must be an integer" + assert isinstance(self.num_workers, int), "num_workers must be \ + an integer" + assert isinstance(self.ssh_port, int), "ssh_port must be an integer" + + +class Config: + + def __init__(self, config_file): + data = json.load(open(config_file)) + self.workload_machines = [Machine(**config) + for config in data["workload_machines"]] + self.benchmark_machines = data["benchmark_machines"] + self.remote_user = data["remote_user"] + self.ssh_public_key = data["ssh_public_key"] + self.ssh_public_key = str(Path(self.ssh_public_key).expanduser()) + + self.memgraph_build_dir = data["memgraph_build_dir"] + self.memgraph_build_dir = str( + Path(self.memgraph_build_dir).expanduser()) + self.memgraph_remote_dir = data["memgraph_remote_dir"] + + self.durability_dir = data["durability_dir"] + self.durability_dir = str(Path(self.durability_dir).expanduser()) + self.durability_remote_dir = data["durability_remote_dir"] + + self.logs_dir = data["logs_dir"] + self.logs_dir = str(Path(self.logs_dir).expanduser()) + self.logs_remote_dir = data["logs_remote_dir"] + + log.info("Initializing with config\n{}".format( + json.dumps(data, indent=4, sort_keys=True))) + + def master(self): + return next(filter(lambda m: m.type == "master", + self.workload_machines), None) + + def workers(self): + return list(filter(lambda m: m.type == "worker", + self.workload_machines)) + + +class RemoteRunner: + + def __init__(self, config): + self._config = config + self._ssh_args = ["-i", self._config.ssh_public_key] + self._scp_args = ["-i", self._config.ssh_public_key] + self._ssh_nohost_cmd = ["ssh"] + self._ssh_args + ["-p"] + + def _run_cmd(self, cmd, daemon=False, **kwargs): + if "stdout" not in kwargs: + kwargs["stdout"] = open("/dev/null", "w") + if "stderr" not in kwargs: + kwargs["stderr"] = subprocess.PIPE + + if "host" in kwargs: + remote_host = kwargs.pop("host", None) + if "user" in kwargs: + user = kwargs.pop("user", self._config.remote_user) + if "ssh_port" in kwargs: + ssh_port = kwargs.pop("ssh_port", None) + + if cmd[0] != "scp": + assert remote_host is not None, "Remote host not specified" + assert ssh_port is not None, "ssh port not specified" + where = "{}@{}".format(user, remote_host) + cmd = self._ssh_nohost_cmd + [str(ssh_port)] + [where] + cmd + + log.info("Command: {}".format(cmd)) + + if not daemon: + ret = subprocess.run(cmd, **kwargs) + err = ret.stderr.decode("utf-8") + if err != "": + log.error("Command: {} - ERROR: {}".format(cmd, err)) + if kwargs["stdout"] == subprocess.PIPE: + return (ret.returncode, ret.stdout.decode("utf-8")) + return ret.returncode + else: + pid = subprocess.Popen(cmd, **kwargs) + return pid.pid + + def _mkdir(self, dir_name="tmp"): + log.info("mkdir {}".format(dir_name)) + for machine in self._config.workload_machines: + log.info("mkdir {} on {}".format(dir_name, machine.host)) + ret = self._run_cmd(["mkdir", "-p", dir_name], + user=self._config.remote_user, + host=machine.host, ssh_port=machine.ssh_port) + log.info(ret) + + def _listdir(self): + # for testing purposes only + log.info("listdir") + machine = self._config.workload_machines[0] + ret = self._run_cmd(["ls", "-la"], + user=self._config.remote_user, host=machine.host, + stdout=subprocess.PIPE, ssh_port=machine.ssh_port) + log.info("\n{}".format(ret[1])) + + def _memgraph_symlink(self, memgraph_version, machine): + log.info("memgraph-symlink on {}".format(machine.host)) + # create or update a symlink if already exists + ret = self._run_cmd(["ln", "-sf", + os.path.join(self._config.memgraph_remote_dir, + memgraph_version), + os.path.join(self._config.memgraph_remote_dir, + "memgraph")], + user=self._config.remote_user, + host=machine.host, ssh_port=machine.ssh_port) + log.info(ret) + + def memgraph_copy(self): + log.info("memgraph-copy") + self._mkdir(self._config.memgraph_remote_dir) + self._mkdir(self._config.durability_remote_dir) + self._mkdir(self._config.logs_remote_dir) + memgraph_binary = os.path.realpath( + self._config.memgraph_build_dir + "/memgraph") + for machine in self._config.workload_machines: + log.info("memgraph-copy on {}".format(machine.host)) + args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)] + args += [memgraph_binary, + self._config.remote_user + "@" + machine.host + ":" + + self._config.memgraph_remote_dir] + ret = self._run_cmd(args) + log.info(ret) + self._memgraph_symlink(os.path.basename(memgraph_binary), machine) + + def memgraph_clean(self): + log.info("memgraph-clean") + for machine in self._config.workload_machines: + log.info("memgraph-clean on {}".format(machine.host)) + ret = self._run_cmd(["rm", "-rf", + self._config.memgraph_remote_dir], + user=self._config.remote_user, + host=machine.host, ssh_port=machine.ssh_port) + log.info(ret) + + def durability_copy(self): + log.info("durability-copy") + self._mkdir(self._config.durability_remote_dir) + for i, machine in enumerate(self._config.workload_machines): + log.info("durability-copy on {}".format(machine.host)) + # copy durability dir + args = ["scp", "-r"] + self._scp_args + ["-P", + str(machine.ssh_port)] + args += [os.path.join(self._config.durability_dir, + "worker_{}/snapshots".format(i)), + self._config.remote_user + "@" + machine.host + ":" + + self._config.durability_remote_dir] + ret = self._run_cmd(args) + log.info(ret) + + def durability_clean(self): + log.info("durability-clean") + for machine in self._config.workload_machines: + log.info("durability-clean on {}".format(machine.host)) + ret = self._run_cmd(["rm", "-rf", + self._config.durability_remote_dir], + user=self._config.remote_user, + host=machine.host, ssh_port=machine.ssh_port) + log.info(ret) + + def memgraph_master(self): + log.info("memgraph-master") + machine = self._config.master() + assert machine is not None, "Unable to fetch master config" + dir_cmd = ["cd", self._config.memgraph_remote_dir] + memgraph_cmd = ["nohup", "./memgraph", + "--master", + "--master-host", machine.address, + "--master-port", str(machine.port), + "--durability-directory={}".format( + self._config.durability_remote_dir), + "--db-recover-on-startup=true", + "--num-workers", str(machine.num_workers), + "--log-file", + os.path.join(self._config.logs_remote_dir, + "log_worker_0")] + cmd = dir_cmd + ["&&"] + memgraph_cmd + ret = self._run_cmd(cmd, daemon=True, + user=self._config.remote_user, host=machine.host, + ssh_port=machine.ssh_port) + log.info(ret) + + def memgraph_workers(self): + log.info("memgraph-workers") + master = self._config.master() + assert master is not None, "Unable to fetch master config" + workers = self._config.workers() + assert workers, "Unable to fetch workers config" + for i, machine in enumerate(workers): + dir_cmd = ["cd", self._config.memgraph_remote_dir] + worker_cmd = ["nohup", "./memgraph", + "--worker", + "--master-host", master.address, + "--master-port", str(master.port), + "--worker-id", str(i + 1), + "--worker-port", str(machine.port), + "--worker-host", machine.address, + "--durability-directory={}".format( + self._config.durability_remote_dir), + "--db-recover-on-startup=true", + "--num-workers", str(machine.num_workers), + "--log-file", + os.path.join(self._config.logs_remote_dir, + "log_worker_{}".format(i + 1))] + cmd = dir_cmd + ["&&"] + worker_cmd + ret = self._run_cmd(cmd, daemon=True, + user=self._config.remote_user, + host=machine.host, ssh_port=machine.ssh_port) + log.info(ret) + + def memgraph_terminate(self): + log.info("memgraph-terminate") + # only kill master - master stops workers + machine = self._config.master() + ret = self._run_cmd(["kill", "-15", "$(pgrep memgraph)"], + user=self._config.remote_user, + host=machine.host, ssh_port=machine.ssh_port) + log.info(ret) + + def collect_logs(self): + log.info("collect-logs") + for i, machine in enumerate(self._config.workload_machines): + log.info("collect-log from {}".format(machine.host)) + args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)] + args += [self._config.remote_user + "@" + machine.host + ":" + + os.path.join(self._config.logs_remote_dir, + "log_worker_{}".format(i)), + self._config.logs_dir] + ret = self._run_cmd(args) + log.info(ret) + + +def main(): + args = parse_args() + config = Config(args.config) + action = args.action.replace("-", "_") + runner = RemoteRunner(config) + action = getattr(runner, action) + action() + + +if __name__ == "__main__": + main()