Make actions execution multithreaded

Summary:
Apart from multithreaded executions, actions for running
single-node memgraph and tcpdump are added.

Reviewers: mferencevic, mtomic, buda

Reviewed By: mtomic, buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1205
This commit is contained in:
Marko Culinovic 2018-02-23 11:09:27 +01:00
parent d99724647a
commit 31fe4541c2

View File

@ -1,6 +1,9 @@
#!/usr/bin/env python3
import concurrent.futures
import itertools
import json
import logging
import multiprocessing
import os
import subprocess
from argparse import ArgumentParser
@ -51,6 +54,8 @@ Example json config:
}
],
"benchmark_machines":["distcardfraud1.memgraph.io"],
"statsd_address": "10.1.13.4",
"statsd_port" : "2500",
"remote_user": "memgraph",
"ssh_public_key" : "~/.ssh/id_rsa",
"memgraph_build_dir" : "~/Development/memgraph/build",
@ -58,7 +63,9 @@ Example json config:
"durability_dir" : "~/Development/memgraph/build/tests/manual/test_dir",
"durability_remote_dir" : "/home/memgraph/memgraph_remote/durability",
"logs_dir" : "~/Development/memgraph/logs",
"logs_remote_dir" : "/home/memgraph/memgraph_remote/logs"
"logs_remote_dir" : "/home/memgraph/memgraph_remote/logs",
"tcpdump_dir" : "~/Development/memgraph/tcpdumps",
"tcpdump_remote_dir" : "/home/memgraph/tcpdumps"
}
"""
@ -101,6 +108,8 @@ class Config:
self.workload_machines = [Machine(**config)
for config in data["workload_machines"]]
self.benchmark_machines = data["benchmark_machines"]
self.statsd_address = data["statsd_address"]
self.statsd_port = data["statsd_port"]
self.remote_user = data["remote_user"]
self.ssh_public_key = data["ssh_public_key"]
self.ssh_public_key = str(Path(self.ssh_public_key).expanduser())
@ -118,6 +127,10 @@ class Config:
self.logs_dir = str(Path(self.logs_dir).expanduser())
self.logs_remote_dir = data["logs_remote_dir"]
self.tcpdump_dir = data["tcpdump_dir"]
self.tcpdump_dir = str(Path(self.tcpdump_dir).expanduser())
self.tcpdump_remote_dir = data["tcpdump_remote_dir"]
log.info("Initializing with config\n{}".format(
json.dumps(data, indent=4, sort_keys=True)))
@ -137,48 +150,42 @@ class RemoteRunner:
self._ssh_args = ["-i", self._config.ssh_public_key]
self._scp_args = ["-i", self._config.ssh_public_key]
self._ssh_nohost_cmd = ["ssh"] + self._ssh_args + ["-p"]
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count())
def _run_cmd(self, cmd, daemon=False, **kwargs):
if "stdout" not in kwargs:
kwargs["stdout"] = open("/dev/null", "w")
if "stderr" not in kwargs:
kwargs["stderr"] = subprocess.PIPE
if "host" in kwargs:
remote_host = kwargs.pop("host", None)
if "user" in kwargs:
user = kwargs.pop("user", self._config.remote_user)
if "ssh_port" in kwargs:
ssh_port = kwargs.pop("ssh_port", None)
def _run_cmd(self, cmd, stdout=None, stderr=subprocess.PIPE,
host=None, user=None, ssh_port=None, **kwargs):
if not stdout:
stdout = open("/dev/null", "w")
if cmd[0] != "scp":
assert remote_host is not None, "Remote host not specified"
assert host is not None, "Remote host not specified"
assert ssh_port is not None, "ssh port not specified"
where = "{}@{}".format(user, remote_host)
where = "{}@{}".format(user, host)
cmd = self._ssh_nohost_cmd + [str(ssh_port)] + [where] + cmd
log.info("Command: {}".format(cmd))
ret = subprocess.run(cmd, stdout=stdout, stderr=stderr, **kwargs)
err = ret.stderr.decode("utf-8")
if err != "":
log.error("Command: {} - ERROR: {}".format(cmd, err))
if stdout == subprocess.PIPE:
return (ret.returncode, ret.stdout.decode("utf-8"))
return ret.returncode
if not daemon:
ret = subprocess.run(cmd, **kwargs)
err = ret.stderr.decode("utf-8")
if err != "":
log.error("Command: {} - ERROR: {}".format(cmd, err))
if kwargs["stdout"] == subprocess.PIPE:
return (ret.returncode, ret.stdout.decode("utf-8"))
return ret.returncode
else:
pid = subprocess.Popen(cmd, **kwargs)
return pid.pid
def _mkdir_machine(self, machine, dir_name):
log.info("mkdir {} on {}".format(dir_name, machine.host))
ret = self._run_cmd(["mkdir", "-p", dir_name],
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
def _mkdir(self, dir_name="tmp"):
log.info("mkdir {}".format(dir_name))
for machine in self._config.workload_machines:
log.info("mkdir {} on {}".format(dir_name, machine.host))
ret = self._run_cmd(["mkdir", "-p", dir_name],
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
fs = self.executor.map(self._mkdir_machine,
self._config.workload_machines,
itertools.repeat(dir_name))
for _ in fs: pass # noqa
def _listdir(self):
# for testing purposes only
@ -201,6 +208,16 @@ class RemoteRunner:
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
def _memgraph_copy_machine(self, machine, memgraph_binary):
log.info("memgraph-copy on {}".format(machine.host))
args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)]
args += [memgraph_binary,
self._config.remote_user + "@" + machine.host + ":" +
self._config.memgraph_remote_dir]
ret = self._run_cmd(args)
log.info(ret)
self._memgraph_symlink(os.path.basename(memgraph_binary), machine)
def memgraph_copy(self):
log.info("memgraph-copy")
self._mkdir(self._config.memgraph_remote_dir)
@ -208,15 +225,10 @@ class RemoteRunner:
self._mkdir(self._config.logs_remote_dir)
memgraph_binary = os.path.realpath(
self._config.memgraph_build_dir + "/memgraph")
for machine in self._config.workload_machines:
log.info("memgraph-copy on {}".format(machine.host))
args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)]
args += [memgraph_binary,
self._config.remote_user + "@" + machine.host + ":" +
self._config.memgraph_remote_dir]
ret = self._run_cmd(args)
log.info(ret)
self._memgraph_symlink(os.path.basename(memgraph_binary), machine)
fs = self.executor.map(self._memgraph_copy_machine,
self._config.workload_machines,
itertools.repeat(memgraph_binary))
for _ in fs: pass # noqa
def memgraph_clean(self):
log.info("memgraph-clean")
@ -228,20 +240,25 @@ class RemoteRunner:
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
def _durability_copy_machine(self, machine, i):
log.info("durability-copy on {}".format(machine.host))
# copy durability dir
args = ["scp", "-r"] + self._scp_args + ["-P",
str(machine.ssh_port)]
args += [os.path.join(self._config.durability_dir,
"worker_{}/snapshots".format(i)),
self._config.remote_user + "@" + machine.host + ":" +
self._config.durability_remote_dir]
ret = self._run_cmd(args)
log.info(ret)
def durability_copy(self):
log.info("durability-copy")
self._mkdir(self._config.durability_remote_dir)
for i, machine in enumerate(self._config.workload_machines):
log.info("durability-copy on {}".format(machine.host))
# copy durability dir
args = ["scp", "-r"] + self._scp_args + ["-P",
str(machine.ssh_port)]
args += [os.path.join(self._config.durability_dir,
"worker_{}/snapshots".format(i)),
self._config.remote_user + "@" + machine.host + ":" +
self._config.durability_remote_dir]
ret = self._run_cmd(args)
log.info(ret)
fs = self.executor.map(self._durability_copy_machine,
self._config.workload_machines,
itertools.count(0))
for _ in fs: pass # noqa
def durability_clean(self):
log.info("durability-clean")
@ -253,10 +270,47 @@ class RemoteRunner:
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
def _is_process_running(self, process_name, machine):
ret = self._run_cmd(["pgrep", process_name],
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
return ret == 0
def memgraph_single_node(self):
log.info("memgraph-single-node")
machine = self._config.master()
assert machine is not None, "Unable to fetch memgraph config"
is_running = self._is_process_running("memgraph", machine)
assert not is_running, "Memgraph already running on machine: "\
" {}".format(machine)
dir_cmd = ["cd", self._config.memgraph_remote_dir]
memgraph_cmd = ["nohup", "./memgraph",
"--durability-directory={}".format(
self._config.durability_remote_dir),
"--db-recover-on-startup=true",
"--num-workers", str(machine.num_workers),
"--log-file",
os.path.join(self._config.logs_remote_dir,
"log_worker_0"),
"--query-vertex-count-to-expand-existing", "-1"]
cmd = ["eval", "\""] + dir_cmd + ["&&"] + memgraph_cmd + \
["\"", "&> /dev/null < /dev/null &"]
ret = self._run_cmd(cmd,
user=self._config.remote_user, host=machine.host,
ssh_port=machine.ssh_port)
log.info(ret)
is_running = self._is_process_running("memgraph", machine)
assert is_running, "Memgraph failed to start on machine: "\
" {}".format(machine)
def memgraph_master(self):
log.info("memgraph-master")
machine = self._config.master()
assert machine is not None, "Unable to fetch master config"
is_running = self._is_process_running("memgraph", machine)
assert not is_running, "Memgraph already running on machine: "\
" {}".format(machine)
dir_cmd = ["cd", self._config.memgraph_remote_dir]
memgraph_cmd = ["nohup", "./memgraph",
"--master",
@ -266,14 +320,52 @@ class RemoteRunner:
self._config.durability_remote_dir),
"--db-recover-on-startup=true",
"--num-workers", str(machine.num_workers),
"--statsd-address", str(self._config.statsd_address),
"--statsd-port", str(self._config.statsd_port),
"--log-file",
os.path.join(self._config.logs_remote_dir,
"log_worker_0")]
cmd = dir_cmd + ["&&"] + memgraph_cmd
ret = self._run_cmd(cmd, daemon=True,
"log_worker_0"),
"--query-vertex-count-to-expand-existing", "-1"]
cmd = ["eval", "\""] + dir_cmd + ["&&"] + memgraph_cmd + \
["\"", "&> /dev/null < /dev/null &"]
ret = self._run_cmd(cmd,
user=self._config.remote_user, host=machine.host,
ssh_port=machine.ssh_port)
log.info(ret)
is_running = self._is_process_running("memgraph", machine)
assert is_running, "Memgraph failed to start on machine: "\
" {}".format(machine)
def _memgraph_worker(self, machine, master, i):
is_running = self._is_process_running("memgraph", machine)
assert not is_running, "Memgraph already running on machine: "\
" {}".format(machine)
dir_cmd = ["cd", self._config.memgraph_remote_dir]
worker_cmd = ["nohup", "./memgraph",
"--worker",
"--master-host", master.address,
"--master-port", str(master.port),
"--worker-id", str(i),
"--worker-port", str(machine.port),
"--worker-host", machine.address,
"--durability-directory={}".format(
self._config.durability_remote_dir),
"--db-recover-on-startup=true",
"--num-workers", str(machine.num_workers),
"--statsd-address", str(self._config.statsd_address),
"--statsd-port", str(self._config.statsd_port),
"--log-file",
os.path.join(self._config.logs_remote_dir,
"log_worker_{}".format(i))]
cmd = ["eval", "\""] + dir_cmd + ["&&"] + worker_cmd + \
["\"", "&> /dev/null < /dev/null &"]
ret = self._run_cmd(cmd,
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
is_running = self._is_process_running("memgraph", machine)
assert is_running, "Memgraph failed to start on machine: "\
" {}".format(machine)
def memgraph_workers(self):
log.info("memgraph-workers")
@ -281,47 +373,98 @@ class RemoteRunner:
assert master is not None, "Unable to fetch master config"
workers = self._config.workers()
assert workers, "Unable to fetch workers config"
for i, machine in enumerate(workers):
dir_cmd = ["cd", self._config.memgraph_remote_dir]
worker_cmd = ["nohup", "./memgraph",
"--worker",
"--master-host", master.address,
"--master-port", str(master.port),
"--worker-id", str(i + 1),
"--worker-port", str(machine.port),
"--worker-host", machine.address,
"--durability-directory={}".format(
self._config.durability_remote_dir),
"--db-recover-on-startup=true",
"--num-workers", str(machine.num_workers),
"--log-file",
os.path.join(self._config.logs_remote_dir,
"log_worker_{}".format(i + 1))]
cmd = dir_cmd + ["&&"] + worker_cmd
ret = self._run_cmd(cmd, daemon=True,
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
fs = self.executor.map(self._memgraph_worker,
self._config.workload_machines[1:],
itertools.repeat(master),
itertools.count(1))
for _ in fs: pass # noqa
def _kill_process(self, process, signal, machine):
ret = self._run_cmd(["sudo", "kill", str(signal),
"$(pgrep {})".format(process)],
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
def memgraph_terminate(self):
log.info("memgraph-terminate")
# only kill master - master stops workers
machine = self._config.master()
ret = self._run_cmd(["kill", "-15", "$(pgrep memgraph)"],
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
self._kill_process("memgraph", 15, machine)
def memgraph_kill(self):
log.info("memgraph-kill")
# only kill master - master stops workers
machine = self._config.master()
self._kill_process("memgraph", 9, machine)
def _collect_log(self, machine, i):
log.info("collect-log from {}".format(machine.host))
args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)]
args += [self._config.remote_user + "@" + machine.host + ":" +
os.path.join(self._config.logs_remote_dir,
"log_worker_{}".format(i)),
self._config.logs_dir]
ret = self._run_cmd(args)
log.info(ret)
def collect_logs(self):
log.info("collect-logs")
for i, machine in enumerate(self._config.workload_machines):
log.info("collect-log from {}".format(machine.host))
args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)]
args += [self._config.remote_user + "@" + machine.host + ":" +
os.path.join(self._config.logs_remote_dir,
"log_worker_{}".format(i)),
self._config.logs_dir]
ret = self._run_cmd(args)
fs = self.executor.map(self._collect_log,
self._config.workload_machines,
itertools.count(0))
for _ in fs: pass # noqa
def _tcpdump_machine(self, machine, i):
cmd = ["sudo", "tcpdump", "-i", "eth0", "-w", os.path.join(
self._config.tcpdump_remote_dir,
"dump_worker_{}".format(i))]
cmd = ["eval", "\""] + cmd + ["\"", "&> /dev/null < /dev/null &"]
ret = self._run_cmd(cmd,
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
def tcpdump_start(self):
log.info("tcpdump-start")
self._mkdir(self._config.tcpdump_remote_dir)
fs = self.executor.map(self._tcpdump_machine,
self._config.workload_machines,
itertools.count(0))
for _ in fs: pass # noqa
def _tcpdump_collect_machine(self, machine, i):
log.info("collect-tcpdump from {}".format(machine.host))
# first kill tcpdump process
ret = self._run_cmd(["nohup", "sudo", "kill", "-15",
"$(pgrep tcpdump)"],
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)
args = ["scp"] + self._scp_args + ["-P", str(machine.ssh_port)]
args += [self._config.remote_user + "@" + machine.host + ":" +
os.path.join(self._config.tcpdump_remote_dir,
"dump_worker_{}".format(i)),
self._config.tcpdump_dir]
ret = self._run_cmd(args)
log.info(ret)
def tcpdump_collect(self):
log.info("tcpdump-collect")
fs = self.executor.map(self._tcpdump_collect_machine,
self._config.workload_machines,
itertools.count(0))
for _ in fs: pass # noqa
def tcpdump_clean(self):
log.info("tcpdump-clean")
for machine in self._config.workload_machines:
log.info("tcpdump-clean on {}".format(machine.host))
ret = self._run_cmd(["rm", "-rf",
self._config.tcpdump_remote_dir],
user=self._config.remote_user,
host=machine.host, ssh_port=machine.ssh_port)
log.info(ret)