Files
tools_scripts/top_traffic.py
2026-04-03 12:32:19 +02:00

885 lines
32 KiB
Python

#!/usr/bin/env python3
# example: python3 top_traffic.py -d 300 --resolve --server-ip 10.20.10.11 -i bond0 --delete-pcap
import argparse
import collections
import datetime as dt
import json
import math
import os
import re
import shlex
import shutil
import signal
import socket
import statistics
import subprocess
import sys
import time
def shell_join(cmd):
try:
return shlex.join(cmd)
except AttributeError:
return " ".join(shlex.quote(str(part)) for part in cmd)
def run_compat(cmd, check=False):
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
if check and proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, cmd, output=proc.stdout, stderr=proc.stderr)
return proc
class Packet(object):
__slots__ = ("ts", "length", "src", "dst", "sport", "dport", "proto")
def __init__(self, ts, length, src, dst, sport="", dport="", proto=""):
self.ts = ts
self.length = length
self.src = src
self.dst = dst
self.sport = sport
self.dport = dport
self.proto = proto
class BucketSummary(object):
__slots__ = ("start_ts", "end_ts", "bytes_total", "packets_total", "top_ips", "top_pairs", "top_ports")
def __init__(self, start_ts, end_ts, bytes_total, packets_total, top_ips, top_pairs, top_ports):
self.start_ts = start_ts
self.end_ts = end_ts
self.bytes_total = bytes_total
self.packets_total = packets_total
self.top_ips = top_ips
self.top_pairs = top_pairs
self.top_ports = top_ports
class CaptureError(RuntimeError):
pass
class AnalysisError(RuntimeError):
pass
def human_bytes(num):
units = ["B", "KiB", "MiB", "GiB", "TiB"]
value = float(num)
for unit in units:
if value < 1024.0 or unit == units[-1]:
if unit == "B":
return "{} {}".format(int(value), unit)
return "{:.2f} {}".format(value, unit)
value /= 1024.0
return "{:.2f} TiB".format(value)
def human_bps(num_bytes_per_sec):
return "{}/s".format(human_bytes(num_bytes_per_sec))
def ask(prompt, default=None):
suffix = " [{}]".format(default) if default not in (None, "") else ""
value = input("{}{}: ".format(prompt, suffix)).strip()
if not value and default is not None:
return default
return value
def ensure_binary(name):
path = shutil.which(name)
if not path:
raise RuntimeError("Brakuje wymaganego programu: {}".format(name))
return path
def detect_local_ips(interface_name=None):
ips = set()
ip_bin = shutil.which("ip")
if not ip_bin:
return ips
cmd = [ip_bin, "-o", "addr", "show"]
if interface_name and interface_name != "any":
cmd.extend(["dev", interface_name])
try:
proc = run_compat(cmd)
if proc.returncode != 0:
return ips
for line in proc.stdout.splitlines():
parts = line.split()
if "inet" in parts:
idx = parts.index("inet")
if idx + 1 < len(parts):
ips.add(parts[idx + 1].split("/", 1)[0])
if "inet6" in parts:
idx = parts.index("inet6")
if idx + 1 < len(parts):
ip6 = parts[idx + 1].split("/", 1)[0]
if not ip6.startswith("fe80:"):
ips.add(ip6)
except Exception:
return ips
return ips
def normalize_local_ips(items):
result = set()
if not items:
return result
for item in items:
if not item:
continue
for part in str(item).split(","):
value = part.strip()
if value:
result.add(value)
return result
def select_interface():
try:
tcpdump = ensure_binary("tcpdump")
proc = run_compat([tcpdump, "-D"], check=True)
print("\nDostepne interfejsy:")
print(proc.stdout.strip() or "(brak listy od tcpdump -D)")
except Exception:
print("\nNie udalo sie pobrac listy interfejsow z tcpdump -D.")
return ask("Interfejs do nasluchu", "any")
def build_capture_cmd(tcpdump_path, iface, output_file, capture_filter, snaplen, buffer_kib, no_promisc):
cmd = [
tcpdump_path,
"-i",
iface,
"-s",
str(snaplen),
"-B",
str(buffer_kib),
"-U",
"-w",
output_file,
]
if no_promisc:
cmd.append("-p")
if capture_filter:
cmd.extend(shlex.split(capture_filter))
return cmd
def run_capture(iface, duration_s, capture_filter, output_file, snaplen, buffer_kib, no_promisc):
tcpdump = ensure_binary("tcpdump")
cmd = build_capture_cmd(
tcpdump_path=tcpdump,
iface=iface,
output_file=output_file,
capture_filter=capture_filter,
snaplen=snaplen,
buffer_kib=buffer_kib,
no_promisc=no_promisc,
)
print("\nStart capture:")
print(" ", shell_join(cmd))
print("\nZbieram ruch przez {} s...".format(duration_s))
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, universal_newlines=True)
start = time.time()
spinner = "|/-\\"
idx = 0
stderr_tail = []
try:
while True:
elapsed = time.time() - start
if elapsed >= duration_s:
proc.send_signal(signal.SIGINT)
break
ch = spinner[idx % len(spinner)]
idx += 1
left = max(0, int(duration_s - elapsed))
sys.stdout.write("\r{} elapsed={:>4}s left={:>4}s file={}".format(ch, int(elapsed), left, output_file))
sys.stdout.flush()
time.sleep(0.5)
if proc.poll() is not None:
break
_, stderr = proc.communicate(timeout=20)
sys.stdout.write("\n")
if stderr:
stderr_tail = [line for line in stderr.strip().splitlines() if line.strip()][-10:]
except KeyboardInterrupt:
proc.send_signal(signal.SIGINT)
_, stderr = proc.communicate(timeout=20)
sys.stdout.write("\nPrzerwano przez uzytkownika. Analizuje to, co juz zapisano.\n")
if stderr:
stderr_tail = [line for line in stderr.strip().splitlines() if line.strip()][-10:]
except Exception:
proc.kill()
raise
if proc.returncode not in (0, 130):
details = "\n".join(stderr_tail) if stderr_tail else "brak stderr"
raise CaptureError("tcpdump zakonczyl sie kodem {}.\n{}".format(proc.returncode, details))
if not os.path.exists(output_file) or os.path.getsize(output_file) == 0:
details = "\n".join(stderr_tail) if stderr_tail else "Brak zapisanych pakietow."
raise CaptureError(details)
if stderr_tail:
print("tcpdump info:")
for line in stderr_tail:
print(" ", line)
print("Zapisano: {} ({})".format(output_file, human_bytes(os.path.getsize(output_file))))
return output_file
def tshark_available():
return shutil.which("tshark") is not None
def parse_packets_with_tshark(pcap_file):
tshark = ensure_binary("tshark")
cmd = [
tshark,
"-n",
"-r",
pcap_file,
"-T",
"fields",
"-E",
"separator=\t",
"-E",
"header=n",
"-E",
"occurrence=f",
"-e",
"frame.time_epoch",
"-e",
"frame.len",
"-e",
"ip.src",
"-e",
"ip.dst",
"-e",
"ipv6.src",
"-e",
"ipv6.dst",
"-e",
"tcp.srcport",
"-e",
"tcp.dstport",
"-e",
"udp.srcport",
"-e",
"udp.dstport",
"-e",
"_ws.col.Protocol",
]
proc = run_compat(cmd)
if proc.returncode != 0:
raise AnalysisError(proc.stderr.strip() or "tshark zwrocil blad")
packets = []
for raw_line in proc.stdout.splitlines():
parts = raw_line.rstrip("\n").split("\t")
if len(parts) < 11:
parts.extend([""] * (11 - len(parts)))
ts_raw, len_raw, ip_src, ip_dst, ipv6_src, ipv6_dst, tcp_s, tcp_d, udp_s, udp_d, proto = parts[:11]
src = ip_src or ipv6_src
dst = ip_dst or ipv6_dst
if not src or not dst:
continue
sport = tcp_s or udp_s or ""
dport = tcp_d or udp_d or ""
try:
packets.append(Packet(float(ts_raw), int(len_raw), src, dst, sport, dport, proto))
except ValueError:
continue
return packets
def parse_packets_with_tcpdump(pcap_file):
tcpdump = ensure_binary("tcpdump")
cmd = [tcpdump, "-nn", "-tt", "-r", pcap_file]
proc = run_compat(cmd)
if proc.returncode != 0:
raise AnalysisError(proc.stderr.strip() or "tcpdump -r zwrocil blad")
packets = []
for line in proc.stdout.splitlines():
line = line.strip()
if not line:
continue
try:
left, right = line.split(": ", 1)
fields = left.split()
if len(fields) < 4:
continue
ts = float(fields[0])
family = fields[1]
if family not in ("IP", "IP6"):
continue
arrow_pos = fields.index(">")
src_raw = fields[arrow_pos - 1]
dst_raw = fields[arrow_pos + 1].rstrip(":")
except Exception:
continue
length = 0
match = re.search(r"\blength\s+(\d+)\b", right)
if match:
try:
length = int(match.group(1))
except Exception:
length = 0
if length <= 0:
continue
src, sport = split_host_port(src_raw, family == "IP6")
dst, dport = split_host_port(dst_raw, family == "IP6")
if src and dst:
packets.append(Packet(ts, length, src, dst, sport, dport, family))
return packets
def split_host_port(value, is_ipv6):
value = value.rstrip(":")
if is_ipv6:
if value.count(".") and value.rfind(".") > value.rfind(":"):
host, _, port = value.rpartition(".")
return host, port
return value, ""
parts = value.rsplit(".", 1)
if len(parts) == 2 and parts[1].isdigit():
return parts[0], parts[1]
return value, ""
def parse_packets(pcap_file):
if tshark_available():
try:
return parse_packets_with_tshark(pcap_file), "tshark"
except Exception as exc:
print("[warn] tshark nie udal sie ({}), probuje fallback przez tcpdump -r".format(exc))
return parse_packets_with_tcpdump(pcap_file), "tcpdump"
def reverse_resolve(ip, cache):
if ip in cache:
return cache[ip]
try:
name = socket.gethostbyaddr(ip)[0]
cache[ip] = name
except Exception:
cache[ip] = ip
return cache[ip]
def top_n(counter, limit):
return sorted(counter.items(), key=lambda kv: (-kv[1], kv[0]))[:limit]
def validate_lengths(packets):
if not packets:
return
max_len = max(p.length for p in packets)
avg_len = sum(p.length for p in packets) / float(len(packets))
if max_len > 10 ** 7 or avg_len > 10 ** 6:
raise AnalysisError(
"Parser zwrocil nierealne dlugosci pakietow (max={}, avg={:.1f}). "
"To zwykle oznacza zly parse outputu tcpdump -r; sprobuj z nowsza wersja skryptu albo tshark."
.format(max_len, avg_len)
)
def analyze_packets(packets, bucket_seconds, top_ip_limit, top_pair_limit, local_ips=None):
if not packets:
raise AnalysisError("Brak pakietow IP/IPv6 do analizy.")
local_ips = set(local_ips or [])
packets = sorted(packets, key=lambda p: p.ts)
first_ts = packets[0].ts
last_ts = packets[-1].ts
duration = max(0.001, last_ts - first_ts)
per_ip_total = collections.Counter()
per_ip_in = collections.Counter()
per_ip_out = collections.Counter()
per_pair = collections.Counter()
per_proto = collections.Counter()
per_port_total = collections.Counter()
per_port_src = collections.Counter()
per_port_dst = collections.Counter()
bucket_bytes = collections.Counter()
bucket_packets = collections.Counter()
bucket_ip_bytes = collections.defaultdict(collections.Counter)
bucket_pair_bytes = collections.defaultdict(collections.Counter)
bucket_port_bytes = collections.defaultdict(collections.Counter)
local_total = collections.Counter()
internal_pairs = collections.Counter()
unmatched_bytes = 0
for p in packets:
src_is_local = p.src in local_ips
dst_is_local = p.dst in local_ips
ports_track = False
if local_ips:
if src_is_local and not dst_is_local:
per_ip_total[p.dst] += p.length
per_ip_out[p.dst] += p.length
per_pair["{} -> {}".format(p.src, p.dst)] += p.length
bucket_key_ip = p.dst
bucket_key_pair = "{} -> {}".format(p.src, p.dst)
local_total[p.src] += p.length
ports_track = True
elif dst_is_local and not src_is_local:
per_ip_total[p.src] += p.length
per_ip_in[p.src] += p.length
per_pair["{} -> {}".format(p.src, p.dst)] += p.length
bucket_key_ip = p.src
bucket_key_pair = "{} -> {}".format(p.src, p.dst)
local_total[p.dst] += p.length
ports_track = True
elif src_is_local and dst_is_local:
internal_pairs["{} -> {}".format(p.src, p.dst)] += p.length
local_total[p.src] += p.length
local_total[p.dst] += p.length
bucket_key_ip = None
bucket_key_pair = None
else:
unmatched_bytes += p.length
bucket_key_ip = None
bucket_key_pair = None
else:
per_ip_total[p.src] += p.length
per_ip_total[p.dst] += p.length
per_ip_out[p.src] += p.length
per_ip_in[p.dst] += p.length
per_pair["{} -> {}".format(p.src, p.dst)] += p.length
bucket_key_ip_src = p.src
bucket_key_ip_dst = p.dst
bucket_key_pair = "{} -> {}".format(p.src, p.dst)
ports_track = True
if p.proto:
per_proto[p.proto] += p.length
idx = int((p.ts - first_ts) // bucket_seconds)
bucket_bytes[idx] += p.length
bucket_packets[idx] += 1
if ports_track:
if p.sport:
sport = str(p.sport)
per_port_src[sport] += p.length
per_port_total[sport] += p.length
bucket_port_bytes[idx]["src:{}".format(sport)] += p.length
if p.dport:
dport = str(p.dport)
per_port_dst[dport] += p.length
per_port_total[dport] += p.length
bucket_port_bytes[idx]["dst:{}".format(dport)] += p.length
if local_ips:
if bucket_key_ip is not None:
bucket_ip_bytes[idx][bucket_key_ip] += p.length
if bucket_key_pair is not None:
bucket_pair_bytes[idx][bucket_key_pair] += p.length
else:
bucket_ip_bytes[idx][bucket_key_ip_src] += p.length
bucket_ip_bytes[idx][bucket_key_ip_dst] += p.length
bucket_pair_bytes[idx][bucket_key_pair] += p.length
buckets = []
for idx, total in bucket_bytes.items():
start_ts = first_ts + idx * bucket_seconds
end_ts = start_ts + bucket_seconds
buckets.append(
BucketSummary(
start_ts=start_ts,
end_ts=end_ts,
bytes_total=total,
packets_total=bucket_packets[idx],
top_ips=top_n(bucket_ip_bytes[idx], 3),
top_pairs=top_n(bucket_pair_bytes[idx], 3),
top_ports=top_n(bucket_port_bytes[idx], 3),
)
)
buckets.sort(key=lambda b: (-b.bytes_total, b.start_ts))
bucket_values = list(bucket_bytes.values())
avg_bucket = statistics.mean(bucket_values)
p95_bucket = quantile(bucket_values, 0.95)
interesting = []
used_windows = []
for b in buckets:
if b.bytes_total < max(p95_bucket, avg_bucket * 1.8):
continue
if any(abs(b.start_ts - prev_start) < bucket_seconds for prev_start, _ in used_windows):
continue
used_windows.append((b.start_ts, b.end_ts))
interesting.append(b)
if len(interesting) >= 5:
break
if not interesting:
interesting = buckets[: min(3, len(buckets))]
matched_total = sum(per_ip_total.values()) if local_ips else sum(p.length for p in packets)
return {
"first_ts": first_ts,
"last_ts": last_ts,
"duration": duration,
"packet_count": len(packets),
"total_bytes": sum(p.length for p in packets),
"matched_total_bytes": matched_total,
"unmatched_bytes": unmatched_bytes,
"local_ips": sorted(local_ips),
"local_total": top_n(local_total, top_ip_limit),
"internal_pairs": top_n(internal_pairs, top_pair_limit),
"avg_rate_Bps": sum(p.length for p in packets) / duration,
"per_ip_total": top_n(per_ip_total, top_ip_limit),
"per_ip_in": top_n(per_ip_in, top_ip_limit),
"per_ip_out": top_n(per_ip_out, top_ip_limit),
"per_pair": top_n(per_pair, top_pair_limit),
"per_proto": top_n(per_proto, 10),
"per_port_total": top_n(per_port_total, top_ip_limit),
"per_port_src": top_n(per_port_src, top_ip_limit),
"per_port_dst": top_n(per_port_dst, top_ip_limit),
"buckets": buckets,
"interesting": interesting,
"avg_bucket": avg_bucket,
"p95_bucket": p95_bucket,
"local_mode": bool(local_ips),
}
def quantile(values, q):
if not values:
return 0.0
ordered = sorted(values)
pos = (len(ordered) - 1) * q
lo = int(math.floor(pos))
hi = int(math.ceil(pos))
if lo == hi:
return float(ordered[int(pos)])
frac = pos - lo
return ordered[lo] * (1 - frac) + ordered[hi] * frac
def fmt_ts(epoch):
return dt.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M:%S")
def render_report(stats, resolve=False):
cache = {}
def pretty_ip(ip):
if not resolve:
return ip
resolved = reverse_resolve(ip, cache)
return ip if resolved == ip else "{} ({})".format(resolved, ip)
lines = []
lines.append("=" * 88)
lines.append("PODSUMOWANIE RUCHU")
lines.append("=" * 88)
lines.append("Okno: {} .. {}".format(fmt_ts(stats["first_ts"]), fmt_ts(stats["last_ts"])))
lines.append("Czas: {:.1f} s".format(stats["duration"]))
lines.append("Pakiety: {}".format(stats["packet_count"]))
lines.append("Wolumen: {}".format(human_bytes(stats["total_bytes"])))
lines.append("Srednia: {}".format(human_bps(stats["avg_rate_Bps"])))
lines.append("Bucket avg: {}".format(human_bytes(stats["avg_bucket"])))
lines.append("Bucket p95: {}".format(human_bytes(stats["p95_bucket"])))
lines.append("Sredni pakiet: {}".format(human_bytes(stats["total_bytes"] / float(max(1, stats["packet_count"])))))
if stats.get("local_mode"):
lines.append("Lokalne IP: {}".format(", ".join(stats.get("local_ips", [])) or "-"))
lines.append("Ruch zmapowany do zdalnych hostow: {}".format(human_bytes(stats.get("matched_total_bytes", 0))))
if stats.get("unmatched_bytes", 0):
lines.append("Ruch poza lokalnymi IP: {}".format(human_bytes(stats.get("unmatched_bytes", 0))))
lines.append("")
def section(title, items, formatter=None, show_avg=False):
lines.append(title)
lines.append("-" * len(title))
count = 0
for key, value in items:
count += 1
label = formatter(key) if formatter else key
if show_avg:
avg = value / max(0.001, stats["duration"])
lines.append(
"{:>2}. {:<52} {:>12} avg={:>12}".format(
count, label[:52], human_bytes(value), human_bps(avg)
)
)
else:
lines.append("{:>2}. {:<52} {:>12}".format(count, label[:52], human_bytes(value)))
if count == 0:
lines.append("(brak danych)")
lines.append("")
section("Top zdalne IP lacznie" if stats.get("local_mode") else "Top IP lacznie", stats["per_ip_total"], pretty_ip, show_avg=True)
section("Top zdalne IP inbound do serwera" if stats.get("local_mode") else "Top IP inbound do serwera", stats["per_ip_in"], pretty_ip, show_avg=True)
section("Top zdalne IP outbound z serwera" if stats.get("local_mode") else "Top IP outbound z serwera", stats["per_ip_out"], pretty_ip, show_avg=True)
section("Top pary serwer <-> zdalny host" if stats.get("local_mode") else "Top pary src -> dst", stats["per_pair"])
section("Top porty lacznie", stats["per_port_total"])
section("Top porty source", stats["per_port_src"])
section("Top porty destination", stats["per_port_dst"])
section("Top protokoly", stats["per_proto"])
lines.append("Najciekawsze momenty")
lines.append("-" * len("Najciekawsze momenty"))
for idx, bucket in enumerate(stats["interesting"], start=1):
duration = max(1.0, bucket.end_ts - bucket.start_ts)
rate = bucket.bytes_total / duration
top_ips = ", ".join("{} [{}]".format(pretty_ip(ip), human_bytes(b)) for ip, b in bucket.top_ips)
top_pairs = ", ".join("{} [{}]".format(pair, human_bytes(b)) for pair, b in bucket.top_pairs)
top_ports = ", ".join("{} [{}]".format(port, human_bytes(b)) for port, b in bucket.top_ports)
lines.append(
"{}. {} .. {} | {} | {} | pakiety={}".format(
idx,
fmt_ts(bucket.start_ts),
fmt_ts(bucket.end_ts),
human_bytes(bucket.bytes_total),
human_bps(rate),
bucket.packets_total,
)
)
lines.append(" Top IP: {}".format(top_ips or "-"))
lines.append(" Top pair: {}".format(top_pairs or "-"))
lines.append(" Top port: {}".format(top_ports or "-"))
lines.append("")
return "\n".join(lines)
def interactive_shell(stats, resolve):
cache = {}
def pretty(ip):
if not resolve:
return ip
name = reverse_resolve(ip, cache)
return ip if name == ip else "{} ({})".format(name, ip)
totals = dict(stats["per_ip_total"])
inbound = dict(stats["per_ip_in"])
outbound = dict(stats["per_ip_out"])
print("Tryb interaktywny. Komendy: top, ip <addr>, moments, pairs, ports, quit")
while True:
try:
raw = input("traffic> ").strip()
except (EOFError, KeyboardInterrupt):
print()
return
if not raw:
continue
if raw in {"q", "quit", "exit"}:
return
if raw == "top":
print(render_report(stats, resolve=resolve))
continue
if raw == "moments":
print("Najciekawsze momenty:")
for i, bucket in enumerate(stats["interesting"], start=1):
duration = max(1.0, bucket.end_ts - bucket.start_ts)
print(
" {}. {} .. {} | {} | {} | pakiety={}".format(
i,
fmt_ts(bucket.start_ts),
fmt_ts(bucket.end_ts),
human_bytes(bucket.bytes_total),
human_bps(bucket.bytes_total / duration),
bucket.packets_total,
)
)
continue
if raw == "pairs":
print("Top pary:")
for i, (pair, b) in enumerate(stats["per_pair"], start=1):
print(" {:>2}. {:<52} {:>12}".format(i, pair[:52], human_bytes(b)))
continue
if raw == "ports":
print("Top porty lacznie:")
for i, (port, b) in enumerate(stats["per_port_total"], start=1):
print(" {:>2}. {:<20} {:>12}".format(i, port[:20], human_bytes(b)))
print("Top porty source:")
for i, (port, b) in enumerate(stats["per_port_src"], start=1):
print(" {:>2}. {:<20} {:>12}".format(i, port[:20], human_bytes(b)))
print("Top porty destination:")
for i, (port, b) in enumerate(stats["per_port_dst"], start=1):
print(" {:>2}. {:<20} {:>12}".format(i, port[:20], human_bytes(b)))
continue
if raw.startswith("ip "):
ip = raw[3:].strip()
print("{}".format(pretty(ip)))
print(" total: {}".format(human_bytes(totals.get(ip, 0))))
print(" inbound: {}".format(human_bytes(inbound.get(ip, 0))))
print(" outbound: {}".format(human_bytes(outbound.get(ip, 0))))
print(" avg total: {}".format(human_bps(totals.get(ip, 0) / max(0.001, stats["duration"]))))
print(" avg inbound: {}".format(human_bps(inbound.get(ip, 0) / max(0.001, stats["duration"]))))
print(" avg outbound: {}".format(human_bps(outbound.get(ip, 0) / max(0.001, stats["duration"]))))
continue
print("Nieznana komenda. Uzyj: top, ip <addr>, moments, pairs, ports, quit")
def parse_args():
p = argparse.ArgumentParser(description="Capture + hotspot report for tcpdump traffic windows")
p.add_argument("-i", "--interface", help="interfejs, np. eth0 albo any")
p.add_argument("-d", "--duration", type=int, default=900, help="czas capture w sekundach, domyslnie 900")
p.add_argument("-f", "--filter", default="", help="capture filter tcpdump/libpcap")
p.add_argument("-o", "--output", help="plik .pcap/.pcapng")
p.add_argument("--read", help="analizuj istniejacy plik .pcap/.pcapng zamiast robic capture")
p.add_argument("--delete-pcap", action="store_true", help="usun plik .pcap po zapisaniu raportow")
p.add_argument("--server-ip", action="append", help="lokalne IP serwera; mozna podac wiele razy albo lista po przecinku")
p.add_argument("--bucket", type=int, default=30, help="bucket czasowy dla spike, domyslnie 30 s")
p.add_argument("--top", type=int, default=15, help="ile IP pokazac w topkach")
p.add_argument("--top-pairs", type=int, default=15, help="ile par src->dst pokazac")
p.add_argument("--snaplen", type=int, default=0, help="snaplen dla tcpdump; 0 = pelny pakiet")
p.add_argument("--buffer-kib", type=int, default=8192, help="buffer tcpdump w KiB")
p.add_argument("--no-promisc", action="store_true", help="dodaj -p do tcpdump")
p.add_argument("--resolve", action="store_true", help="reverse DNS dla IP w raporcie")
p.add_argument("--no-interactive", action="store_true", help="po raporcie nie odpalaj prompta")
return p.parse_args()
def main():
args = parse_args()
try:
local_ips = normalize_local_ips(args.server_ip)
if args.read:
pcap_path = os.path.abspath(args.read)
if not os.path.exists(pcap_path):
raise RuntimeError("Nie istnieje plik do analizy: {}".format(pcap_path))
print("Tryb read: {}".format(pcap_path))
if local_ips:
print("Lokalne IP z parametru: {}".format(", ".join(sorted(local_ips))))
else:
print("[warn] Brak --server-ip. Ranking IP bedzie liczony dla obu stron ruchu, wiec lokalne IP serwera moze dominowac.")
else:
if not args.interface:
args.interface = select_interface()
if not args.output:
stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
args.output = os.path.abspath("traffic_{}_{}.pcap".format(args.interface, stamp))
if not local_ips:
auto_local_ips = detect_local_ips(args.interface)
if auto_local_ips:
local_ips = set(auto_local_ips)
print("Auto-detekcja lokalnych IP: {}".format(", ".join(sorted(local_ips))))
else:
print("[warn] Nie udalo sie wykryc lokalnych IP. Ranking IP bedzie liczony dla obu stron ruchu.")
pcap_path = args.output
run_capture(
iface=args.interface,
duration_s=args.duration,
capture_filter=args.filter,
output_file=pcap_path,
snaplen=args.snaplen,
buffer_kib=args.buffer_kib,
no_promisc=args.no_promisc,
)
packets, parser_name = parse_packets(pcap_path)
print("Analiza parserem: {}".format(parser_name))
validate_lengths(packets)
stats = analyze_packets(
packets=packets,
bucket_seconds=args.bucket,
top_ip_limit=args.top,
top_pair_limit=args.top_pairs,
local_ips=local_ips,
)
report = render_report(stats, resolve=args.resolve)
print("\n" + report)
report_path = os.path.splitext(pcap_path)[0] + ".report.txt"
json_path = os.path.splitext(pcap_path)[0] + ".report.json"
with open(report_path, "w", encoding="utf-8") as fh:
fh.write(report + "\n")
with open(json_path, "w", encoding="utf-8") as fh:
json.dump(
{
"summary": {
"first_ts": stats["first_ts"],
"last_ts": stats["last_ts"],
"duration": stats["duration"],
"packet_count": stats["packet_count"],
"total_bytes": stats["total_bytes"],
"avg_rate_Bps": stats["avg_rate_Bps"],
"local_ips": stats.get("local_ips", []),
"matched_total_bytes": stats.get("matched_total_bytes", 0),
"unmatched_bytes": stats.get("unmatched_bytes", 0),
},
"per_ip_total": stats["per_ip_total"],
"per_ip_in": stats["per_ip_in"],
"per_ip_out": stats["per_ip_out"],
"per_pair": stats["per_pair"],
"per_proto": stats["per_proto"],
"per_port_total": stats["per_port_total"],
"per_port_src": stats["per_port_src"],
"per_port_dst": stats["per_port_dst"],
"interesting": [
{
"start_ts": x.start_ts,
"end_ts": x.end_ts,
"bytes_total": x.bytes_total,
"packets_total": x.packets_total,
"top_ips": x.top_ips,
"top_pairs": x.top_pairs,
"top_ports": x.top_ports,
}
for x in stats["interesting"]
],
},
fh,
ensure_ascii=False,
indent=2,
)
print("Raport TXT: {}".format(report_path))
print("Raport JSON: {}".format(json_path))
if args.delete_pcap:
try:
os.remove(pcap_path)
print("Usunieto PCAP: {}".format(pcap_path))
except OSError as exc:
print("[warn] Nie udalo sie usunac PCAP {}: {}".format(pcap_path, exc), file=sys.stderr)
if not args.no_interactive:
interactive_shell(stats, resolve=args.resolve)
return 0
except (RuntimeError, CaptureError, AnalysisError) as exc:
print("[blad] {}".format(exc), file=sys.stderr)
return 2
if __name__ == "__main__":
raise SystemExit(main())