diff --git a/tools/analyze_rpc_calls b/tools/analyze_rpc_calls new file mode 100755 index 000000000..015ca7be2 --- /dev/null +++ b/tools/analyze_rpc_calls @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 +import argparse +import collections +import dpkt +import json +import operator +import socket +import struct +import tabulate + +# helpers + +def format_endpoint(addr, port): + return "{}:{}".format(socket.inet_ntoa(addr), port) + + +class Connection: + # uint32_t channel_size + HEADER_FORMAT = "I" + + # uint32_t message_number, uint32_t message_size + MESSAGE_FORMAT = "II" + + # 8: boost archive string size + # 22: boost archive fixed string "serialization::archive" + # 17: boost archive magic bytes + BOOST_OFFSET = 8 + 22 + 17 + + def __init__(self): + self._handshake_done = False + self._channel = "" + + self._data = bytes() + self._message = bytes() + self._ts = [] + + self._header_format_len = struct.calcsize(self.HEADER_FORMAT) + self._message_format_len = struct.calcsize(self.MESSAGE_FORMAT) + + self._last = None + self._stats = collections.defaultdict(lambda: + {"duration": [], "size": []}) + + def _extract_channel(self): + if len(self._data) < self._header_format_len: + return False + chan_len = struct.unpack_from(self.HEADER_FORMAT, self._data)[0] + if len(self._data) < self._header_format_len + chan_len: + return False + self._channel = self._data[self._header_format_len:].decode("utf-8") + self._data = bytes() + return True + + def _extract_message(self): + if len(self._data) < self._message_format_len: + return False + msg_num, msg_len = struct.unpack_from("IH", self._data) + if len(self._data) < self._message_format_len + msg_len: + return False + self._message = self._data[self._message_format_len:] + self._data = bytes() + return True + + def add_data(self, data, ts): + self._data += data + self._ts.append(ts) + + if not self._handshake_done: + if not self._extract_channel(): + return + self._handshake_done = True + self._ts = [] + + if not self._extract_message(): + return + + message_type_size = struct.unpack_from("Q", self._message, + self.BOOST_OFFSET)[0] + message_type = struct.unpack_from("{}s".format(message_type_size), + self._message, self.BOOST_OFFSET + 8)[0].decode("utf-8") + + if self._last is None: + self._last = (message_type, self._ts[0]) + else: + req_type, req_ts = self._last + duration = self._ts[-1] - req_ts + self._stats[(req_type, message_type)]["duration"].append(duration) + self._stats[(req_type, message_type)]["size"].append( + len(self._message)) + self._last = None + + self._ts = [] + + def get_stats(self): + return self._stats + + +class Server: + def __init__(self): + self._conns = collections.defaultdict(lambda: Connection()) + + def add_data(self, addr, data, ts): + self._conns[addr].add_data(data, ts) + + def print_stats(self, machine_names, title, sort_by): + stats = collections.defaultdict(lambda: collections.defaultdict( + lambda: {"duration": [], "size": []})) + + for addr, conn in self._conns.items(): + ip, port = addr.split(":") + for rpc, connstats in conn.get_stats().items(): + stats[ip][rpc]["duration"] += connstats["duration"] + stats[ip][rpc]["size"] += connstats["size"] + + table = [] + headers = ["RPC ({})".format(title), "Client", "Count", "Tmin (ms)", + "Tavg (ms)", "Tmax (ms)", "Ttot (s)", "Smin (B)", + "Savg (B)", "Smax (B)", "Stot (kiB)"] + sort_keys = ["rpc", "client", "count", "tmin", "tavg", "tmax", "ttot", + "smin", "savg", "smax", "stot"] + for client in sorted(stats.keys()): + rpcs = stats[client] + for rpc, connstats in rpcs.items(): + durs = connstats["duration"] + sizes = connstats["size"] + durs_sum = sum(durs) + sizes_sum = sum(sizes) + table.append(["{} / {}".format(*rpc), machine_names[client], + len(durs), min(durs) * 1000, + durs_sum / len(durs) * 1000, + max(durs) * 1000, durs_sum, min(sizes), + int(sizes_sum / len(sizes)), max(sizes), + sizes_sum / 1024]) + for sort_field in sort_by.split(","): + reverse = True if sort_field.endswith("-") else False + table.sort(key=operator.itemgetter(sort_keys.index( + sort_field.rstrip("+-"))), reverse=reverse) + print(tabulate.tabulate(table, headers=headers, tablefmt="psql", + floatfmt=".3f")) + + +# process logic + +parser = argparse.ArgumentParser(description="Generate RPC statistics from " + "network traffic capture.") +parser.add_argument("--sort-by", default="tavg+,count-,client+", + help="comma separated list of fields which should be used to sort " + "the data; each field can be suffixed with + or - to indicate " + "ascending or descending order; available fields: rpc, client, " + "count, min, avg, max, total") +parser.add_argument("capfile", help="network traffic capture file") +parser.add_argument("conffile", help="cluster config file") +args = parser.parse_args() + +config = json.load(open(args.conffile)) +last_worker = 0 +machine_names = {} +server_addresses = [] +for machine in config["workload_machines"]: + name = machine["type"] + if name == "worker": + last_worker += 1 + name += str(last_worker) + machine_names["{address}".format(**machine)] = name + server_addresses.append("{address}:{port}".format(**machine)) + +servers = collections.defaultdict(Server) + +for ts, pkt in dpkt.pcap.Reader(open(args.capfile, "rb")): + eth = dpkt.ethernet.Ethernet(pkt) + if eth.type != dpkt.ethernet.ETH_TYPE_IP: + continue + + ip = eth.data + if ip.p != dpkt.ip.IP_PROTO_TCP: + continue + + tcp = ip.data + src = format_endpoint(ip.src, tcp.sport) + dst = format_endpoint(ip.dst, tcp.dport) + if src not in server_addresses and dst not in server_addresses: + continue + if len(tcp.data) == 0: + continue + + server = dst if dst in server_addresses else src + client = dst if dst not in server_addresses else src + + servers[server].add_data(client, tcp.data, ts) + +for server in sorted(servers.keys()): + servers[server].print_stats(machine_names=machine_names, + title=machine_names[server.split(":")[0]], + sort_by=args.sort_by)