#!/usr/bin/env python3 # example: python3 top_traffic.py -d 300 --resolve --server-ip 10.20.10.11 -i bond0 --delete-pcap import argparse import collections import datetime as dt import json import math import os import re import shlex import shutil import signal import socket import statistics import subprocess import sys import time def shell_join(cmd): try: return shlex.join(cmd) except AttributeError: return " ".join(shlex.quote(str(part)) for part in cmd) def run_compat(cmd, check=False): proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) if check and proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, cmd, output=proc.stdout, stderr=proc.stderr) return proc class Packet(object): __slots__ = ("ts", "length", "src", "dst", "sport", "dport", "proto") def __init__(self, ts, length, src, dst, sport="", dport="", proto=""): self.ts = ts self.length = length self.src = src self.dst = dst self.sport = sport self.dport = dport self.proto = proto class BucketSummary(object): __slots__ = ("start_ts", "end_ts", "bytes_total", "packets_total", "top_ips", "top_pairs", "top_ports") def __init__(self, start_ts, end_ts, bytes_total, packets_total, top_ips, top_pairs, top_ports): self.start_ts = start_ts self.end_ts = end_ts self.bytes_total = bytes_total self.packets_total = packets_total self.top_ips = top_ips self.top_pairs = top_pairs self.top_ports = top_ports class CaptureError(RuntimeError): pass class AnalysisError(RuntimeError): pass def human_bytes(num): units = ["B", "KiB", "MiB", "GiB", "TiB"] value = float(num) for unit in units: if value < 1024.0 or unit == units[-1]: if unit == "B": return "{} {}".format(int(value), unit) return "{:.2f} {}".format(value, unit) value /= 1024.0 return "{:.2f} TiB".format(value) def human_bps(num_bytes_per_sec): return "{}/s".format(human_bytes(num_bytes_per_sec)) def ask(prompt, default=None): suffix = " [{}]".format(default) if default not in (None, "") else "" value = input("{}{}: ".format(prompt, suffix)).strip() if not value and default is not None: return default return value def ensure_binary(name): path = shutil.which(name) if not path: raise RuntimeError("Brakuje wymaganego programu: {}".format(name)) return path def detect_local_ips(interface_name=None): ips = set() ip_bin = shutil.which("ip") if not ip_bin: return ips cmd = [ip_bin, "-o", "addr", "show"] if interface_name and interface_name != "any": cmd.extend(["dev", interface_name]) try: proc = run_compat(cmd) if proc.returncode != 0: return ips for line in proc.stdout.splitlines(): parts = line.split() if "inet" in parts: idx = parts.index("inet") if idx + 1 < len(parts): ips.add(parts[idx + 1].split("/", 1)[0]) if "inet6" in parts: idx = parts.index("inet6") if idx + 1 < len(parts): ip6 = parts[idx + 1].split("/", 1)[0] if not ip6.startswith("fe80:"): ips.add(ip6) except Exception: return ips return ips def normalize_local_ips(items): result = set() if not items: return result for item in items: if not item: continue for part in str(item).split(","): value = part.strip() if value: result.add(value) return result def select_interface(): try: tcpdump = ensure_binary("tcpdump") proc = run_compat([tcpdump, "-D"], check=True) print("\nDostepne interfejsy:") print(proc.stdout.strip() or "(brak listy od tcpdump -D)") except Exception: print("\nNie udalo sie pobrac listy interfejsow z tcpdump -D.") return ask("Interfejs do nasluchu", "any") def build_capture_cmd(tcpdump_path, iface, output_file, capture_filter, snaplen, buffer_kib, no_promisc): cmd = [ tcpdump_path, "-i", iface, "-s", str(snaplen), "-B", str(buffer_kib), "-U", "-w", output_file, ] if no_promisc: cmd.append("-p") if capture_filter: cmd.extend(shlex.split(capture_filter)) return cmd def run_capture(iface, duration_s, capture_filter, output_file, snaplen, buffer_kib, no_promisc): tcpdump = ensure_binary("tcpdump") cmd = build_capture_cmd( tcpdump_path=tcpdump, iface=iface, output_file=output_file, capture_filter=capture_filter, snaplen=snaplen, buffer_kib=buffer_kib, no_promisc=no_promisc, ) print("\nStart capture:") print(" ", shell_join(cmd)) print("\nZbieram ruch przez {} s...".format(duration_s)) proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, universal_newlines=True) start = time.time() spinner = "|/-\\" idx = 0 stderr_tail = [] try: while True: elapsed = time.time() - start if elapsed >= duration_s: proc.send_signal(signal.SIGINT) break ch = spinner[idx % len(spinner)] idx += 1 left = max(0, int(duration_s - elapsed)) sys.stdout.write("\r{} elapsed={:>4}s left={:>4}s file={}".format(ch, int(elapsed), left, output_file)) sys.stdout.flush() time.sleep(0.5) if proc.poll() is not None: break _, stderr = proc.communicate(timeout=20) sys.stdout.write("\n") if stderr: stderr_tail = [line for line in stderr.strip().splitlines() if line.strip()][-10:] except KeyboardInterrupt: proc.send_signal(signal.SIGINT) _, stderr = proc.communicate(timeout=20) sys.stdout.write("\nPrzerwano przez uzytkownika. Analizuje to, co juz zapisano.\n") if stderr: stderr_tail = [line for line in stderr.strip().splitlines() if line.strip()][-10:] except Exception: proc.kill() raise if proc.returncode not in (0, 130): details = "\n".join(stderr_tail) if stderr_tail else "brak stderr" raise CaptureError("tcpdump zakonczyl sie kodem {}.\n{}".format(proc.returncode, details)) if not os.path.exists(output_file) or os.path.getsize(output_file) == 0: details = "\n".join(stderr_tail) if stderr_tail else "Brak zapisanych pakietow." raise CaptureError(details) if stderr_tail: print("tcpdump info:") for line in stderr_tail: print(" ", line) print("Zapisano: {} ({})".format(output_file, human_bytes(os.path.getsize(output_file)))) return output_file def tshark_available(): return shutil.which("tshark") is not None def parse_packets_with_tshark(pcap_file): tshark = ensure_binary("tshark") cmd = [ tshark, "-n", "-r", pcap_file, "-T", "fields", "-E", "separator=\t", "-E", "header=n", "-E", "occurrence=f", "-e", "frame.time_epoch", "-e", "frame.len", "-e", "ip.src", "-e", "ip.dst", "-e", "ipv6.src", "-e", "ipv6.dst", "-e", "tcp.srcport", "-e", "tcp.dstport", "-e", "udp.srcport", "-e", "udp.dstport", "-e", "_ws.col.Protocol", ] proc = run_compat(cmd) if proc.returncode != 0: raise AnalysisError(proc.stderr.strip() or "tshark zwrocil blad") packets = [] for raw_line in proc.stdout.splitlines(): parts = raw_line.rstrip("\n").split("\t") if len(parts) < 11: parts.extend([""] * (11 - len(parts))) ts_raw, len_raw, ip_src, ip_dst, ipv6_src, ipv6_dst, tcp_s, tcp_d, udp_s, udp_d, proto = parts[:11] src = ip_src or ipv6_src dst = ip_dst or ipv6_dst if not src or not dst: continue sport = tcp_s or udp_s or "" dport = tcp_d or udp_d or "" try: packets.append(Packet(float(ts_raw), int(len_raw), src, dst, sport, dport, proto)) except ValueError: continue return packets def parse_packets_with_tcpdump(pcap_file): tcpdump = ensure_binary("tcpdump") cmd = [tcpdump, "-nn", "-tt", "-r", pcap_file] proc = run_compat(cmd) if proc.returncode != 0: raise AnalysisError(proc.stderr.strip() or "tcpdump -r zwrocil blad") packets = [] for line in proc.stdout.splitlines(): line = line.strip() if not line: continue try: left, right = line.split(": ", 1) fields = left.split() if len(fields) < 4: continue ts = float(fields[0]) family = fields[1] if family not in ("IP", "IP6"): continue arrow_pos = fields.index(">") src_raw = fields[arrow_pos - 1] dst_raw = fields[arrow_pos + 1].rstrip(":") except Exception: continue length = 0 match = re.search(r"\blength\s+(\d+)\b", right) if match: try: length = int(match.group(1)) except Exception: length = 0 if length <= 0: continue src, sport = split_host_port(src_raw, family == "IP6") dst, dport = split_host_port(dst_raw, family == "IP6") if src and dst: packets.append(Packet(ts, length, src, dst, sport, dport, family)) return packets def split_host_port(value, is_ipv6): value = value.rstrip(":") if is_ipv6: if value.count(".") and value.rfind(".") > value.rfind(":"): host, _, port = value.rpartition(".") return host, port return value, "" parts = value.rsplit(".", 1) if len(parts) == 2 and parts[1].isdigit(): return parts[0], parts[1] return value, "" def parse_packets(pcap_file): if tshark_available(): try: return parse_packets_with_tshark(pcap_file), "tshark" except Exception as exc: print("[warn] tshark nie udal sie ({}), probuje fallback przez tcpdump -r".format(exc)) return parse_packets_with_tcpdump(pcap_file), "tcpdump" def reverse_resolve(ip, cache): if ip in cache: return cache[ip] try: name = socket.gethostbyaddr(ip)[0] cache[ip] = name except Exception: cache[ip] = ip return cache[ip] def top_n(counter, limit): return sorted(counter.items(), key=lambda kv: (-kv[1], kv[0]))[:limit] def validate_lengths(packets): if not packets: return max_len = max(p.length for p in packets) avg_len = sum(p.length for p in packets) / float(len(packets)) if max_len > 10 ** 7 or avg_len > 10 ** 6: raise AnalysisError( "Parser zwrocil nierealne dlugosci pakietow (max={}, avg={:.1f}). " "To zwykle oznacza zly parse outputu tcpdump -r; sprobuj z nowsza wersja skryptu albo tshark." .format(max_len, avg_len) ) def analyze_packets(packets, bucket_seconds, top_ip_limit, top_pair_limit, local_ips=None): if not packets: raise AnalysisError("Brak pakietow IP/IPv6 do analizy.") local_ips = set(local_ips or []) packets = sorted(packets, key=lambda p: p.ts) first_ts = packets[0].ts last_ts = packets[-1].ts duration = max(0.001, last_ts - first_ts) per_ip_total = collections.Counter() per_ip_in = collections.Counter() per_ip_out = collections.Counter() per_pair = collections.Counter() per_proto = collections.Counter() per_port_total = collections.Counter() per_port_src = collections.Counter() per_port_dst = collections.Counter() bucket_bytes = collections.Counter() bucket_packets = collections.Counter() bucket_ip_bytes = collections.defaultdict(collections.Counter) bucket_pair_bytes = collections.defaultdict(collections.Counter) bucket_port_bytes = collections.defaultdict(collections.Counter) local_total = collections.Counter() internal_pairs = collections.Counter() unmatched_bytes = 0 for p in packets: src_is_local = p.src in local_ips dst_is_local = p.dst in local_ips ports_track = False if local_ips: if src_is_local and not dst_is_local: per_ip_total[p.dst] += p.length per_ip_out[p.dst] += p.length per_pair["{} -> {}".format(p.src, p.dst)] += p.length bucket_key_ip = p.dst bucket_key_pair = "{} -> {}".format(p.src, p.dst) local_total[p.src] += p.length ports_track = True elif dst_is_local and not src_is_local: per_ip_total[p.src] += p.length per_ip_in[p.src] += p.length per_pair["{} -> {}".format(p.src, p.dst)] += p.length bucket_key_ip = p.src bucket_key_pair = "{} -> {}".format(p.src, p.dst) local_total[p.dst] += p.length ports_track = True elif src_is_local and dst_is_local: internal_pairs["{} -> {}".format(p.src, p.dst)] += p.length local_total[p.src] += p.length local_total[p.dst] += p.length bucket_key_ip = None bucket_key_pair = None else: unmatched_bytes += p.length bucket_key_ip = None bucket_key_pair = None else: per_ip_total[p.src] += p.length per_ip_total[p.dst] += p.length per_ip_out[p.src] += p.length per_ip_in[p.dst] += p.length per_pair["{} -> {}".format(p.src, p.dst)] += p.length bucket_key_ip_src = p.src bucket_key_ip_dst = p.dst bucket_key_pair = "{} -> {}".format(p.src, p.dst) ports_track = True if p.proto: per_proto[p.proto] += p.length idx = int((p.ts - first_ts) // bucket_seconds) bucket_bytes[idx] += p.length bucket_packets[idx] += 1 if ports_track: if p.sport: sport = str(p.sport) per_port_src[sport] += p.length per_port_total[sport] += p.length bucket_port_bytes[idx]["src:{}".format(sport)] += p.length if p.dport: dport = str(p.dport) per_port_dst[dport] += p.length per_port_total[dport] += p.length bucket_port_bytes[idx]["dst:{}".format(dport)] += p.length if local_ips: if bucket_key_ip is not None: bucket_ip_bytes[idx][bucket_key_ip] += p.length if bucket_key_pair is not None: bucket_pair_bytes[idx][bucket_key_pair] += p.length else: bucket_ip_bytes[idx][bucket_key_ip_src] += p.length bucket_ip_bytes[idx][bucket_key_ip_dst] += p.length bucket_pair_bytes[idx][bucket_key_pair] += p.length buckets = [] for idx, total in bucket_bytes.items(): start_ts = first_ts + idx * bucket_seconds end_ts = start_ts + bucket_seconds buckets.append( BucketSummary( start_ts=start_ts, end_ts=end_ts, bytes_total=total, packets_total=bucket_packets[idx], top_ips=top_n(bucket_ip_bytes[idx], 3), top_pairs=top_n(bucket_pair_bytes[idx], 3), top_ports=top_n(bucket_port_bytes[idx], 3), ) ) buckets.sort(key=lambda b: (-b.bytes_total, b.start_ts)) bucket_values = list(bucket_bytes.values()) avg_bucket = statistics.mean(bucket_values) p95_bucket = quantile(bucket_values, 0.95) interesting = [] used_windows = [] for b in buckets: if b.bytes_total < max(p95_bucket, avg_bucket * 1.8): continue if any(abs(b.start_ts - prev_start) < bucket_seconds for prev_start, _ in used_windows): continue used_windows.append((b.start_ts, b.end_ts)) interesting.append(b) if len(interesting) >= 5: break if not interesting: interesting = buckets[: min(3, len(buckets))] matched_total = sum(per_ip_total.values()) if local_ips else sum(p.length for p in packets) return { "first_ts": first_ts, "last_ts": last_ts, "duration": duration, "packet_count": len(packets), "total_bytes": sum(p.length for p in packets), "matched_total_bytes": matched_total, "unmatched_bytes": unmatched_bytes, "local_ips": sorted(local_ips), "local_total": top_n(local_total, top_ip_limit), "internal_pairs": top_n(internal_pairs, top_pair_limit), "avg_rate_Bps": sum(p.length for p in packets) / duration, "per_ip_total": top_n(per_ip_total, top_ip_limit), "per_ip_in": top_n(per_ip_in, top_ip_limit), "per_ip_out": top_n(per_ip_out, top_ip_limit), "per_pair": top_n(per_pair, top_pair_limit), "per_proto": top_n(per_proto, 10), "per_port_total": top_n(per_port_total, top_ip_limit), "per_port_src": top_n(per_port_src, top_ip_limit), "per_port_dst": top_n(per_port_dst, top_ip_limit), "buckets": buckets, "interesting": interesting, "avg_bucket": avg_bucket, "p95_bucket": p95_bucket, "local_mode": bool(local_ips), } def quantile(values, q): if not values: return 0.0 ordered = sorted(values) pos = (len(ordered) - 1) * q lo = int(math.floor(pos)) hi = int(math.ceil(pos)) if lo == hi: return float(ordered[int(pos)]) frac = pos - lo return ordered[lo] * (1 - frac) + ordered[hi] * frac def fmt_ts(epoch): return dt.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M:%S") def render_report(stats, resolve=False): cache = {} def pretty_ip(ip): if not resolve: return ip resolved = reverse_resolve(ip, cache) return ip if resolved == ip else "{} ({})".format(resolved, ip) lines = [] lines.append("=" * 88) lines.append("PODSUMOWANIE RUCHU") lines.append("=" * 88) lines.append("Okno: {} .. {}".format(fmt_ts(stats["first_ts"]), fmt_ts(stats["last_ts"]))) lines.append("Czas: {:.1f} s".format(stats["duration"])) lines.append("Pakiety: {}".format(stats["packet_count"])) lines.append("Wolumen: {}".format(human_bytes(stats["total_bytes"]))) lines.append("Srednia: {}".format(human_bps(stats["avg_rate_Bps"]))) lines.append("Bucket avg: {}".format(human_bytes(stats["avg_bucket"]))) lines.append("Bucket p95: {}".format(human_bytes(stats["p95_bucket"]))) lines.append("Sredni pakiet: {}".format(human_bytes(stats["total_bytes"] / float(max(1, stats["packet_count"]))))) if stats.get("local_mode"): lines.append("Lokalne IP: {}".format(", ".join(stats.get("local_ips", [])) or "-")) lines.append("Ruch zmapowany do zdalnych hostow: {}".format(human_bytes(stats.get("matched_total_bytes", 0)))) if stats.get("unmatched_bytes", 0): lines.append("Ruch poza lokalnymi IP: {}".format(human_bytes(stats.get("unmatched_bytes", 0)))) lines.append("") def section(title, items, formatter=None, show_avg=False): lines.append(title) lines.append("-" * len(title)) count = 0 for key, value in items: count += 1 label = formatter(key) if formatter else key if show_avg: avg = value / max(0.001, stats["duration"]) lines.append( "{:>2}. {:<52} {:>12} avg={:>12}".format( count, label[:52], human_bytes(value), human_bps(avg) ) ) else: lines.append("{:>2}. {:<52} {:>12}".format(count, label[:52], human_bytes(value))) if count == 0: lines.append("(brak danych)") lines.append("") section("Top zdalne IP lacznie" if stats.get("local_mode") else "Top IP lacznie", stats["per_ip_total"], pretty_ip, show_avg=True) section("Top zdalne IP inbound do serwera" if stats.get("local_mode") else "Top IP inbound do serwera", stats["per_ip_in"], pretty_ip, show_avg=True) section("Top zdalne IP outbound z serwera" if stats.get("local_mode") else "Top IP outbound z serwera", stats["per_ip_out"], pretty_ip, show_avg=True) section("Top pary serwer <-> zdalny host" if stats.get("local_mode") else "Top pary src -> dst", stats["per_pair"]) section("Top porty lacznie", stats["per_port_total"]) section("Top porty source", stats["per_port_src"]) section("Top porty destination", stats["per_port_dst"]) section("Top protokoly", stats["per_proto"]) lines.append("Najciekawsze momenty") lines.append("-" * len("Najciekawsze momenty")) for idx, bucket in enumerate(stats["interesting"], start=1): duration = max(1.0, bucket.end_ts - bucket.start_ts) rate = bucket.bytes_total / duration top_ips = ", ".join("{} [{}]".format(pretty_ip(ip), human_bytes(b)) for ip, b in bucket.top_ips) top_pairs = ", ".join("{} [{}]".format(pair, human_bytes(b)) for pair, b in bucket.top_pairs) top_ports = ", ".join("{} [{}]".format(port, human_bytes(b)) for port, b in bucket.top_ports) lines.append( "{}. {} .. {} | {} | {} | pakiety={}".format( idx, fmt_ts(bucket.start_ts), fmt_ts(bucket.end_ts), human_bytes(bucket.bytes_total), human_bps(rate), bucket.packets_total, ) ) lines.append(" Top IP: {}".format(top_ips or "-")) lines.append(" Top pair: {}".format(top_pairs or "-")) lines.append(" Top port: {}".format(top_ports or "-")) lines.append("") return "\n".join(lines) def interactive_shell(stats, resolve): cache = {} def pretty(ip): if not resolve: return ip name = reverse_resolve(ip, cache) return ip if name == ip else "{} ({})".format(name, ip) totals = dict(stats["per_ip_total"]) inbound = dict(stats["per_ip_in"]) outbound = dict(stats["per_ip_out"]) print("Tryb interaktywny. Komendy: top, ip , moments, pairs, ports, quit") while True: try: raw = input("traffic> ").strip() except (EOFError, KeyboardInterrupt): print() return if not raw: continue if raw in {"q", "quit", "exit"}: return if raw == "top": print(render_report(stats, resolve=resolve)) continue if raw == "moments": print("Najciekawsze momenty:") for i, bucket in enumerate(stats["interesting"], start=1): duration = max(1.0, bucket.end_ts - bucket.start_ts) print( " {}. {} .. {} | {} | {} | pakiety={}".format( i, fmt_ts(bucket.start_ts), fmt_ts(bucket.end_ts), human_bytes(bucket.bytes_total), human_bps(bucket.bytes_total / duration), bucket.packets_total, ) ) continue if raw == "pairs": print("Top pary:") for i, (pair, b) in enumerate(stats["per_pair"], start=1): print(" {:>2}. {:<52} {:>12}".format(i, pair[:52], human_bytes(b))) continue if raw == "ports": print("Top porty lacznie:") for i, (port, b) in enumerate(stats["per_port_total"], start=1): print(" {:>2}. {:<20} {:>12}".format(i, port[:20], human_bytes(b))) print("Top porty source:") for i, (port, b) in enumerate(stats["per_port_src"], start=1): print(" {:>2}. {:<20} {:>12}".format(i, port[:20], human_bytes(b))) print("Top porty destination:") for i, (port, b) in enumerate(stats["per_port_dst"], start=1): print(" {:>2}. {:<20} {:>12}".format(i, port[:20], human_bytes(b))) continue if raw.startswith("ip "): ip = raw[3:].strip() print("{}".format(pretty(ip))) print(" total: {}".format(human_bytes(totals.get(ip, 0)))) print(" inbound: {}".format(human_bytes(inbound.get(ip, 0)))) print(" outbound: {}".format(human_bytes(outbound.get(ip, 0)))) print(" avg total: {}".format(human_bps(totals.get(ip, 0) / max(0.001, stats["duration"])))) print(" avg inbound: {}".format(human_bps(inbound.get(ip, 0) / max(0.001, stats["duration"])))) print(" avg outbound: {}".format(human_bps(outbound.get(ip, 0) / max(0.001, stats["duration"])))) continue print("Nieznana komenda. Uzyj: top, ip , moments, pairs, ports, quit") def parse_args(): p = argparse.ArgumentParser(description="Capture + hotspot report for tcpdump traffic windows") p.add_argument("-i", "--interface", help="interfejs, np. eth0 albo any") p.add_argument("-d", "--duration", type=int, default=900, help="czas capture w sekundach, domyslnie 900") p.add_argument("-f", "--filter", default="", help="capture filter tcpdump/libpcap") p.add_argument("-o", "--output", help="plik .pcap/.pcapng") p.add_argument("--read", help="analizuj istniejacy plik .pcap/.pcapng zamiast robic capture") p.add_argument("--delete-pcap", action="store_true", help="usun plik .pcap po zapisaniu raportow") p.add_argument("--server-ip", action="append", help="lokalne IP serwera; mozna podac wiele razy albo lista po przecinku") p.add_argument("--bucket", type=int, default=30, help="bucket czasowy dla spike, domyslnie 30 s") p.add_argument("--top", type=int, default=15, help="ile IP pokazac w topkach") p.add_argument("--top-pairs", type=int, default=15, help="ile par src->dst pokazac") p.add_argument("--snaplen", type=int, default=0, help="snaplen dla tcpdump; 0 = pelny pakiet") p.add_argument("--buffer-kib", type=int, default=8192, help="buffer tcpdump w KiB") p.add_argument("--no-promisc", action="store_true", help="dodaj -p do tcpdump") p.add_argument("--resolve", action="store_true", help="reverse DNS dla IP w raporcie") p.add_argument("--no-interactive", action="store_true", help="po raporcie nie odpalaj prompta") return p.parse_args() def main(): args = parse_args() try: local_ips = normalize_local_ips(args.server_ip) if args.read: pcap_path = os.path.abspath(args.read) if not os.path.exists(pcap_path): raise RuntimeError("Nie istnieje plik do analizy: {}".format(pcap_path)) print("Tryb read: {}".format(pcap_path)) if local_ips: print("Lokalne IP z parametru: {}".format(", ".join(sorted(local_ips)))) else: print("[warn] Brak --server-ip. Ranking IP bedzie liczony dla obu stron ruchu, wiec lokalne IP serwera moze dominowac.") else: if not args.interface: args.interface = select_interface() if not args.output: stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S") args.output = os.path.abspath("traffic_{}_{}.pcap".format(args.interface, stamp)) if not local_ips: auto_local_ips = detect_local_ips(args.interface) if auto_local_ips: local_ips = set(auto_local_ips) print("Auto-detekcja lokalnych IP: {}".format(", ".join(sorted(local_ips)))) else: print("[warn] Nie udalo sie wykryc lokalnych IP. Ranking IP bedzie liczony dla obu stron ruchu.") pcap_path = args.output run_capture( iface=args.interface, duration_s=args.duration, capture_filter=args.filter, output_file=pcap_path, snaplen=args.snaplen, buffer_kib=args.buffer_kib, no_promisc=args.no_promisc, ) packets, parser_name = parse_packets(pcap_path) print("Analiza parserem: {}".format(parser_name)) validate_lengths(packets) stats = analyze_packets( packets=packets, bucket_seconds=args.bucket, top_ip_limit=args.top, top_pair_limit=args.top_pairs, local_ips=local_ips, ) report = render_report(stats, resolve=args.resolve) print("\n" + report) report_path = os.path.splitext(pcap_path)[0] + ".report.txt" json_path = os.path.splitext(pcap_path)[0] + ".report.json" with open(report_path, "w", encoding="utf-8") as fh: fh.write(report + "\n") with open(json_path, "w", encoding="utf-8") as fh: json.dump( { "summary": { "first_ts": stats["first_ts"], "last_ts": stats["last_ts"], "duration": stats["duration"], "packet_count": stats["packet_count"], "total_bytes": stats["total_bytes"], "avg_rate_Bps": stats["avg_rate_Bps"], "local_ips": stats.get("local_ips", []), "matched_total_bytes": stats.get("matched_total_bytes", 0), "unmatched_bytes": stats.get("unmatched_bytes", 0), }, "per_ip_total": stats["per_ip_total"], "per_ip_in": stats["per_ip_in"], "per_ip_out": stats["per_ip_out"], "per_pair": stats["per_pair"], "per_proto": stats["per_proto"], "per_port_total": stats["per_port_total"], "per_port_src": stats["per_port_src"], "per_port_dst": stats["per_port_dst"], "interesting": [ { "start_ts": x.start_ts, "end_ts": x.end_ts, "bytes_total": x.bytes_total, "packets_total": x.packets_total, "top_ips": x.top_ips, "top_pairs": x.top_pairs, "top_ports": x.top_ports, } for x in stats["interesting"] ], }, fh, ensure_ascii=False, indent=2, ) print("Raport TXT: {}".format(report_path)) print("Raport JSON: {}".format(json_path)) if args.delete_pcap: try: os.remove(pcap_path) print("Usunieto PCAP: {}".format(pcap_path)) except OSError as exc: print("[warn] Nie udalo sie usunac PCAP {}: {}".format(pcap_path, exc), file=sys.stderr) if not args.no_interactive: interactive_shell(stats, resolve=args.resolve) return 0 except (RuntimeError, CaptureError, AnalysisError) as exc: print("[blad] {}".format(exc), file=sys.stderr) return 2 if __name__ == "__main__": raise SystemExit(main())