#!/usr/bin/env python3 import argparse import collections import dpkt import json import operator import socket import struct import tabulate # helpers def format_endpoint(addr, port): return "{}:{}".format(socket.inet_ntoa(addr), port) class Connection: # uint32_t channel_size HEADER_FORMAT = "I" # uint32_t message_number, uint32_t message_size MESSAGE_FORMAT = "II" # 8: boost archive string size # 22: boost archive fixed string "serialization::archive" # 17: boost archive magic bytes BOOST_OFFSET = 8 + 22 + 17 def __init__(self): self._handshake_done = False self._channel = "" self._data = bytes() self._message = bytes() self._ts = [] self._header_format_len = struct.calcsize(self.HEADER_FORMAT) self._message_format_len = struct.calcsize(self.MESSAGE_FORMAT) self._last = None self._stats = collections.defaultdict(lambda: {"duration": [], "size": []}) def _extract_channel(self): if len(self._data) < self._header_format_len: return False chan_len = struct.unpack_from(self.HEADER_FORMAT, self._data)[0] if len(self._data) < self._header_format_len + chan_len: return False self._channel = self._data[self._header_format_len:].decode("utf-8") self._data = bytes() return True def _extract_message(self): if len(self._data) < self._message_format_len: return False msg_num, msg_len = struct.unpack_from("IH", self._data) if len(self._data) < self._message_format_len + msg_len: return False self._message = self._data[self._message_format_len:] self._data = bytes() return True def add_data(self, data, ts): self._data += data self._ts.append(ts) if not self._handshake_done: if not self._extract_channel(): return self._handshake_done = True self._ts = [] if not self._extract_message(): return message_type_size = struct.unpack_from("Q", self._message, self.BOOST_OFFSET)[0] message_type = struct.unpack_from("{}s".format(message_type_size), self._message, self.BOOST_OFFSET + 8)[0].decode("utf-8") if self._last is None: self._last = (message_type, self._ts[0]) else: req_type, req_ts = self._last duration = self._ts[-1] - req_ts self._stats[(req_type, message_type)]["duration"].append(duration) self._stats[(req_type, message_type)]["size"].append( len(self._message)) self._last = None self._ts = [] def get_stats(self): return self._stats class Server: def __init__(self): self._conns = collections.defaultdict(lambda: Connection()) def add_data(self, addr, data, ts): self._conns[addr].add_data(data, ts) def print_stats(self, machine_names, title, sort_by): stats = collections.defaultdict(lambda: collections.defaultdict( lambda: {"duration": [], "size": []})) for addr, conn in self._conns.items(): ip, port = addr.split(":") for rpc, connstats in conn.get_stats().items(): stats[ip][rpc]["duration"] += connstats["duration"] stats[ip][rpc]["size"] += connstats["size"] table = [] headers = ["RPC ({})".format(title), "Client", "Count", "Tmin (ms)", "Tavg (ms)", "Tmax (ms)", "Ttot (s)", "Smin (B)", "Savg (B)", "Smax (B)", "Stot (kiB)"] sort_keys = ["rpc", "client", "count", "tmin", "tavg", "tmax", "ttot", "smin", "savg", "smax", "stot"] for client in sorted(stats.keys()): rpcs = stats[client] for rpc, connstats in rpcs.items(): durs = connstats["duration"] sizes = connstats["size"] durs_sum = sum(durs) sizes_sum = sum(sizes) table.append(["{} / {}".format(*rpc), machine_names[client], len(durs), min(durs) * 1000, durs_sum / len(durs) * 1000, max(durs) * 1000, durs_sum, min(sizes), int(sizes_sum / len(sizes)), max(sizes), sizes_sum / 1024]) for sort_field in sort_by.split(","): reverse = True if sort_field.endswith("-") else False table.sort(key=operator.itemgetter(sort_keys.index( sort_field.rstrip("+-"))), reverse=reverse) print(tabulate.tabulate(table, headers=headers, tablefmt="psql", floatfmt=".3f")) # process logic parser = argparse.ArgumentParser(description="Generate RPC statistics from " "network traffic capture.") parser.add_argument("--sort-by", default="tavg+,count-,client+", help="comma separated list of fields which should be used to sort " "the data; each field can be suffixed with + or - to indicate " "ascending or descending order; available fields: rpc, client, " "count, min, avg, max, total") parser.add_argument("capfile", help="network traffic capture file") parser.add_argument("conffile", help="cluster config file") args = parser.parse_args() config = json.load(open(args.conffile)) last_worker = 0 machine_names = {} server_addresses = [] for machine in config["workload_machines"]: name = machine["type"] if name == "worker": last_worker += 1 name += str(last_worker) machine_names["{address}".format(**machine)] = name server_addresses.append("{address}:{port}".format(**machine)) servers = collections.defaultdict(Server) for ts, pkt in dpkt.pcap.Reader(open(args.capfile, "rb")): eth = dpkt.ethernet.Ethernet(pkt) if eth.type != dpkt.ethernet.ETH_TYPE_IP: continue ip = eth.data if ip.p != dpkt.ip.IP_PROTO_TCP: continue tcp = ip.data src = format_endpoint(ip.src, tcp.sport) dst = format_endpoint(ip.dst, tcp.dport) if src not in server_addresses and dst not in server_addresses: continue if len(tcp.data) == 0: continue server = dst if dst in server_addresses else src client = dst if dst not in server_addresses else src servers[server].add_data(client, tcp.data, ts) for server in sorted(servers.keys()): servers[server].print_stats(machine_names=machine_names, title=machine_names[server.split(":")[0]], sort_by=args.sort_by)