#!/usr/bin/env python3 # -*- coding: utf-8 -*- # example: python3 zfs_probe.py --duration 10 --interval 1 --track-files import argparse import ast import json import math import os import re import shutil import signal import statistics import subprocess import sys import time from collections import defaultdict from datetime import datetime def eprint(msg): sys.stderr.write(str(msg) + "\n") sys.stderr.flush() def oprint(msg=""): sys.stdout.write(str(msg) + "\n") sys.stdout.flush() def human_bytes(n): if n is None: return "-" neg = n < 0 n = abs(float(n)) units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] for unit in units: if n < 1024.0 or unit == units[-1]: s = "{0:.1f} {1}".format(n, unit) if unit != "B" else "{0} B".format(int(n)) return "-" + s if neg else s n /= 1024.0 def human_ns_to_ms(n): if n is None: return "-" try: return "{0:.2f} ms".format(float(n) / 1000000.0) except Exception: return "-" def mean_or_zero(values): return statistics.mean(values) if values else 0.0 def percentile(values, q): if not values: return 0.0 if len(values) == 1: return float(values[0]) values = sorted(values) pos = (len(values) - 1) * q lo = int(math.floor(pos)) hi = int(math.ceil(pos)) if lo == hi: return float(values[lo]) frac = pos - lo return values[lo] * (1.0 - frac) + values[hi] * frac def run_cmd(cmd, timeout=None): proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=timeout, check=False, ) return proc.returncode, proc.stdout, proc.stderr def command_exists(name): return shutil.which(name) is not None def read_file(path, binary=False): mode = "rb" if binary else "r" with open(path, mode) as f: return f.read() def safe_read_text(path): try: return read_file(path, binary=False) except Exception: return "" def ensure_dir(path): if not os.path.isdir(path): os.makedirs(path) def parse_proc_io_text(text): out = {} for line in text.splitlines(): if ":" not in line: continue k, v = line.split(":", 1) try: out[k.strip()] = int(v.strip()) except ValueError: continue return out def read_proc_snapshot(): snap = {} for pid in os.listdir("/proc"): if not pid.isdigit(): continue base = os.path.join("/proc", pid) try: io_txt = read_file(os.path.join(base, "io")) stat_txt = read_file(os.path.join(base, "stat")) comm = safe_read_text(os.path.join(base, "comm")).strip() or "?" cmdline_raw = read_file(os.path.join(base, "cmdline"), binary=True) cmdline = cmdline_raw.replace(b"\x00", b" ").decode("utf-8", "replace").strip() stat_parts = stat_txt.split() starttime = int(stat_parts[21]) io = parse_proc_io_text(io_txt) key = "{0}:{1}".format(pid, starttime) snap[key] = { "pid": int(pid), "starttime": starttime, "comm": comm, "cmdline": cmdline, "io": io, } except (FileNotFoundError, ProcessLookupError, PermissionError, IndexError, ValueError): continue except Exception: continue return snap def diff_proc_snapshots(prev, curr, accum): tracked = ["rchar", "wchar", "syscr", "syscw", "read_bytes", "write_bytes", "cancelled_write_bytes"] for key, cur in curr.items(): if key not in prev: continue old = prev[key] entry = accum.setdefault(key, { "pid": cur["pid"], "comm": cur["comm"], "cmdline": cur["cmdline"], "samples": 0, "rchar": 0, "wchar": 0, "syscr": 0, "syscw": 0, "read_bytes": 0, "write_bytes": 0, "cancelled_write_bytes": 0, "max_interval_read_bytes": 0, "max_interval_write_bytes": 0, }) entry["samples"] += 1 if cur.get("cmdline"): entry["cmdline"] = cur["cmdline"] for k in tracked: delta = cur["io"].get(k, 0) - old["io"].get(k, 0) if delta < 0: delta = 0 entry[k] += delta if k == "read_bytes" and delta > entry["max_interval_read_bytes"]: entry["max_interval_read_bytes"] = delta if k == "write_bytes" and delta > entry["max_interval_write_bytes"]: entry["max_interval_write_bytes"] = delta def get_pools(pool_arg=None): cmd = ["zpool", "list", "-H", "-o", "name"] rc, out, err = run_cmd(cmd, timeout=20) if rc != 0: raise RuntimeError("Nie mogę pobrać listy puli: {0}".format(err.strip())) pools = [x.strip() for x in out.splitlines() if x.strip()] if pool_arg: wanted = [x.strip() for x in pool_arg.split(",") if x.strip()] pools = [p for p in pools if p in wanted] return pools def zpool_status_text(pools): cmd = ["zpool", "status"] + pools return run_cmd(cmd, timeout=30)[1] def zpool_history_text(pools): if not pools: return "" cmd = ["zpool", "history", "-il"] + pools rc, out, err = run_cmd(cmd, timeout=60) if rc != 0: return "zpool history niedostępne: {0}\n".format(err.strip()) return out def zfs_get_properties(pools): props = "atime,relatime,primarycache,secondarycache,prefetch,recordsize,mountpoint" cmd = ["zfs", "get", "-H", "-r", "-o", "name,property,value", props] + pools rc, out, err = run_cmd(cmd, timeout=60) if rc != 0: return [{"error": err.strip()}] rows = [] for line in out.splitlines(): parts = line.split("\t") if len(parts) >= 3: rows.append({"name": parts[0], "property": parts[1], "value": parts[2]}) return rows def parse_arcstats(): path = "/proc/spl/kstat/zfs/arcstats" if not os.path.exists(path): return None raw = safe_read_text(path) data = {} for line in raw.splitlines(): parts = line.split() if len(parts) >= 3 and parts[0] not in ("name", "class"): try: data[parts[0]] = int(parts[2]) except Exception: continue return data or None def arc_delta(prev, curr): if not prev or not curr: return None keys = [ "hits", "misses", "demand_data_hits", "demand_data_misses", "demand_metadata_hits", "demand_metadata_misses", "prefetch_data_hits", "prefetch_data_misses", "prefetch_metadata_hits", "prefetch_metadata_misses", "l2_hits", "l2_misses", ] out = {} for k in keys: out[k] = max(0, curr.get(k, 0) - prev.get(k, 0)) return out def parse_zpool_iostat_once(pool_list, interval): """ Działa z różnymi wersjami zpool iostat. Próbuje najpierw z -l, potem bez -l. Nie zakłada obecności timestampa. """ candidate_cmds = [ ["zpool", "iostat", "-H", "-p", "-y", "-l"] + pool_list + [str(interval), "1"], ["zpool", "iostat", "-H", "-p", "-y"] + pool_list + [str(interval), "1"], ] last_err = "" out = "" for cmd in candidate_cmds: rc, out, err = run_cmd(cmd, timeout=interval + 20) if rc == 0 and out.strip(): break last_err = (err or out or "").strip() out = "" if not out.strip(): raise RuntimeError("zpool iostat failed: {0}".format(last_err)) now_ts = int(time.time()) rows = [] for line in out.splitlines(): line = line.strip() if not line: continue parts = line.split("\t") if "\t" in line else line.split() if len(parts) < 7: continue if parts[0].isdigit(): ts = int(parts[0]) data = parts[1:] else: ts = now_ts data = parts if len(data) < 7: continue try: row = { "ts": ts, "name": data[0], "alloc": int(data[1]), "free": int(data[2]), "rops": int(data[3]), "wops": int(data[4]), "rbytes": int(data[5]), "wbytes": int(data[6]), } except ValueError: continue latency_names = [ "total_wait_ns", "disk_wait_ns", "syncq_wait_ns", "asyncq_wait_ns", "scrub_wait_ns", "trim_wait_ns", "rebuild_wait_ns", ] for i, key in enumerate(latency_names, start=7): if i < len(data): try: row[key] = int(data[i]) except ValueError: row[key] = 0 rows.append(row) return rows def summarize_samples(samples_by_pool): summary = {} for pool, samples in samples_by_pool.items(): rb = [s["rbytes"] for s in samples] wb = [s["wbytes"] for s in samples] ro = [s["rops"] for s in samples] wo = [s["wops"] for s in samples] tw = [s.get("total_wait_ns", 0) for s in samples] dw = [s.get("disk_wait_ns", 0) for s in samples] summary[pool] = { "samples": len(samples), "read_avg": mean_or_zero(rb), "write_avg": mean_or_zero(wb), "read_p95": percentile(rb, 0.95), "write_p95": percentile(wb, 0.95), "read_max": max(rb) if rb else 0, "write_max": max(wb) if wb else 0, "rops_avg": mean_or_zero(ro), "wops_avg": mean_or_zero(wo), "total_wait_avg_ns": mean_or_zero(tw), "disk_wait_avg_ns": mean_or_zero(dw), "total_wait_p95_ns": percentile(tw, 0.95), "disk_wait_p95_ns": percentile(dw, 0.95), } return summary def top_entries(entries, key, topn): vals = [v for v in entries if v.get(key, 0) > 0] vals.sort(key=lambda x: x.get(key, 0), reverse=True) return vals[:topn] def start_bpftrace(outdir, track_files=False): if os.geteuid() != 0: return None, "bpftrace pominięty: uruchom jako root." if not command_exists("bpftrace"): return None, "bpftrace pominięty: brak polecenia bpftrace." program = [] program.append('tracepoint:syscalls:sys_exit_read /args.ret > 0/ { @read_bytes_by_comm[comm] = sum(args.ret); @read_calls_by_comm[comm] = count(); }') program.append('tracepoint:block:block_rq_issue { @block_bytes_by_comm[comm] = sum(args.bytes); @block_ios_by_comm[comm] = count(); }') if track_files: program.append('tracepoint:syscalls:sys_enter_openat { @opens[str(args.filename)] = count(); @opens_by_comm[comm, str(args.filename)] = count(); }') program.append('END {') program.append(' printf("===READ_BYTES_BY_COMM===\\n"); print(@read_bytes_by_comm);') program.append(' printf("===READ_CALLS_BY_COMM===\\n"); print(@read_calls_by_comm);') program.append(' printf("===BLOCK_BYTES_BY_COMM===\\n"); print(@block_bytes_by_comm);') program.append(' printf("===BLOCK_IOS_BY_COMM===\\n"); print(@block_ios_by_comm);') if track_files: program.append(' printf("===OPENS===\\n"); print(@opens);') program.append(' printf("===OPENS_BY_COMM===\\n"); print(@opens_by_comm);') program.append('}') bt_path = os.path.join(outdir, "trace.bt") with open(bt_path, "w") as f: f.write("\n".join(program) + "\n") out_path = os.path.join(outdir, "bpftrace.txt") out_f = open(out_path, "w") proc = subprocess.Popen( ["bpftrace", bt_path], stdout=out_f, stderr=subprocess.STDOUT, universal_newlines=True, ) return {"proc": proc, "out_f": out_f, "out_path": out_path, "bt_path": bt_path}, None SECTION_RE = re.compile(r"^===([A-Z0-9_]+)===$") MAP_LINE_RE = re.compile(r'^@[^[]+\[(.*)\]:\s+(-?\d+)$') def parse_bpftrace_output(path): result = defaultdict(dict) if not path or not os.path.exists(path): return result section = None for raw in safe_read_text(path).splitlines(): line = raw.strip() m = SECTION_RE.match(line) if m: section = m.group(1) continue m = MAP_LINE_RE.match(line) if not m or not section: continue key_raw = m.group(1).strip() value = int(m.group(2)) try: parsed_key = ast.literal_eval(key_raw) except Exception: try: parsed_key = ast.literal_eval("({0},)".format(key_raw)) except Exception: parsed_key = key_raw.strip('"') result[section][parsed_key] = value return result def format_proc_entry(e): cmd = e.get("cmdline") or e.get("comm") or "?" if len(cmd) > 120: cmd = cmd[:117] + "..." return '{0} [pid {1}] {2}'.format(e.get("comm", "?"), e.get("pid", "?"), cmd) def print_table(title, rows, cols): oprint(title) if not rows: oprint(" brak danych") oprint() return widths = [] for col_name, key in cols: width = len(col_name) for row in rows: width = max(width, len(str(row.get(key, "")))) widths.append(width) header = " " + " ".join(col_name.ljust(widths[i]) for i, (col_name, _) in enumerate(cols)) oprint(header) oprint(" " + " ".join("-" * w for w in widths)) for row in rows: oprint(" " + " ".join(str(row.get(key, "")).ljust(widths[i]) for i, (_, key) in enumerate(cols))) oprint() def stop_bpftrace(handle): if not handle: return proc = handle["proc"] if proc.poll() is not None: handle["out_f"].close() return try: proc.send_signal(signal.SIGINT) proc.wait(timeout=3) except Exception: try: proc.terminate() proc.wait(timeout=2) except Exception: try: proc.kill() proc.wait(timeout=2) except Exception: pass try: handle["out_f"].close() except Exception: pass def main(): ap = argparse.ArgumentParser(description="Zbiera próbki ZFS/ARC/procesów przez zadany czas i pokazuje top statystyki.") ap.add_argument("--duration", type=int, default=3600, help="Czas zbierania w sekundach, np. 7200") ap.add_argument("--interval", type=int, default=5, help="Interwał próbek w sekundach") ap.add_argument("--pool", default="", help="Opcjonalnie: jedna lub kilka puli, rozdzielone przecinkiem") ap.add_argument("--top", type=int, default=15, help="Ile pozycji pokazać w topkach") ap.add_argument("--outdir", default="", help="Katalog na logi i JSON") ap.add_argument("--track-files", action="store_true", help="Śledź top otwierane pliki przez bpftrace") ap.add_argument("--no-bpf", action="store_true", help="Nie uruchamiaj bpftrace nawet gdy jest dostępny") args = ap.parse_args() if args.interval <= 0 or args.duration <= 0: eprint("duration i interval muszą być > 0") sys.exit(2) if os.geteuid() != 0: eprint("Uwaga: bez roota część /proc i bpftrace może być niepełna.") ts = datetime.now().strftime("%Y%m%d_%H%M%S") outdir = args.outdir or os.path.abspath("./zfs_probe_{0}".format(ts)) ensure_dir(outdir) oprint("Start. Zbieram dane do: {0}".format(outdir)) pools = get_pools(args.pool) if not pools: eprint("Nie znaleziono żadnych puli ZFS.") sys.exit(1) oprint("Pule: {0}".format(", ".join(pools))) meta = { "started_at": datetime.now().isoformat(), "duration": args.duration, "interval": args.interval, "pools": pools, "hostname": os.uname().nodename, "uid": os.geteuid(), } with open(os.path.join(outdir, "meta.json"), "w") as f: json.dump(meta, f, indent=2) with open(os.path.join(outdir, "zpool_status_start.txt"), "w") as f: f.write(zpool_status_text(pools)) with open(os.path.join(outdir, "zpool_history.txt"), "w") as f: f.write(zpool_history_text(pools)) with open(os.path.join(outdir, "zfs_properties.json"), "w") as f: json.dump(zfs_get_properties(pools), f, indent=2) bpf_handle = None bpf_note = None if not args.no_bpf: bpf_handle, bpf_note = start_bpftrace(outdir, track_files=args.track_files) else: bpf_note = "bpftrace wyłączony przez --no-bpf" if bpf_note: oprint(bpf_note) proc_prev = read_proc_snapshot() arc_prev = parse_arcstats() samples_by_pool = defaultdict(list) arc_samples = [] deadline = time.time() + args.duration rounds = max(1, int(math.ceil(float(args.duration) / float(args.interval)))) proc_accum = {} for n in range(rounds): remaining = deadline - time.time() if remaining <= 0: break interval = min(args.interval, max(1, int(round(remaining)))) oprint("[{0}/{1}] próbka, interwał {2}s...".format(n + 1, rounds, interval)) try: rows = parse_zpool_iostat_once(pools, interval) except Exception as ex: eprint("Błąd zpool iostat: {0}".format(ex)) break for row in rows: samples_by_pool[row["name"]].append(row) proc_cur = read_proc_snapshot() diff_proc_snapshots(proc_prev, proc_cur, proc_accum) proc_prev = proc_cur arc_cur = parse_arcstats() delta = arc_delta(arc_prev, arc_cur) if delta is not None: arc_samples.append(delta) arc_prev = arc_cur oprint("[{0}/{1}] gotowe".format(n + 1, rounds)) if bpf_handle: oprint("Kończę bpftrace...") stop_bpftrace(bpf_handle) with open(os.path.join(outdir, "zpool_status_end.txt"), "w") as f: f.write(zpool_status_text(pools)) with open(os.path.join(outdir, "samples_zpool.json"), "w") as f: json.dump(samples_by_pool, f, indent=2) with open(os.path.join(outdir, "samples_arc.json"), "w") as f: json.dump(arc_samples, f, indent=2) with open(os.path.join(outdir, "proc_totals.json"), "w") as f: json.dump(proc_accum, f, indent=2) sample_summary = summarize_samples(samples_by_pool) bpf = parse_bpftrace_output(bpf_handle["out_path"] if bpf_handle else "") proc_rows = list(proc_accum.values()) top_proc_read = [] for e in top_entries(proc_rows, "read_bytes", args.top): top_proc_read.append({ "proc": format_proc_entry(e), "read": human_bytes(e["read_bytes"]), "write": human_bytes(e["write_bytes"]), "max_read_interval": human_bytes(e["max_interval_read_bytes"]), "syscr": e["syscr"], }) top_proc_write = [] for e in top_entries(proc_rows, "write_bytes", args.top): top_proc_write.append({ "proc": format_proc_entry(e), "write": human_bytes(e["write_bytes"]), "read": human_bytes(e["read_bytes"]), "max_write_interval": human_bytes(e["max_interval_write_bytes"]), "syscw": e["syscw"], }) oprint() oprint("Wyniki zapisane w: {0}".format(outdir)) oprint("Pule: {0}".format(", ".join(pools))) oprint("Czas zbierania: {0}s, interwał: {1}s, próbek: {2}".format( args.duration, args.interval, sum(len(v) for v in samples_by_pool.values()) )) oprint() pool_rows = [] for pool in pools: s = sample_summary.get(pool, {}) pool_rows.append({ "pool": pool, "avg_read/s": human_bytes(s.get("read_avg", 0)), "avg_write/s": human_bytes(s.get("write_avg", 0)), "p95_read/s": human_bytes(s.get("read_p95", 0)), "p95_write/s": human_bytes(s.get("write_p95", 0)), "max_read/s": human_bytes(s.get("read_max", 0)), "max_write/s": human_bytes(s.get("write_max", 0)), "avg_total_wait": human_ns_to_ms(s.get("total_wait_avg_ns", 0)), "avg_disk_wait": human_ns_to_ms(s.get("disk_wait_avg_ns", 0)), }) print_table("Podsumowanie puli", pool_rows, [ ("pool", "pool"), ("avg_read/s", "avg_read/s"), ("avg_write/s", "avg_write/s"), ("p95_read/s", "p95_read/s"), ("p95_write/s", "p95_write/s"), ("avg_total_wait", "avg_total_wait"), ("avg_disk_wait", "avg_disk_wait"), ]) print_table("Top procesy wg read_bytes z /proc//io", top_proc_read, [ ("proc", "proc"), ("read", "read"), ("write", "write"), ("max_read_interval", "max_read_interval"), ("syscr", "syscr"), ]) print_table("Top procesy wg write_bytes z /proc//io", top_proc_write, [ ("proc", "proc"), ("write", "write"), ("read", "read"), ("max_write_interval", "max_write_interval"), ("syscw", "syscw"), ]) if arc_samples: hits = sum(x.get("hits", 0) for x in arc_samples) misses = sum(x.get("misses", 0) for x in arc_samples) total = hits + misses hitp = (100.0 * hits / total) if total else 0.0 l2_hits = sum(x.get("l2_hits", 0) for x in arc_samples) l2_misses = sum(x.get("l2_misses", 0) for x in arc_samples) l2_total = l2_hits + l2_misses l2p = (100.0 * l2_hits / l2_total) if l2_total else 0.0 oprint("ARC/L2ARC") oprint(" ARC hit rate: {0:.2f}% (hits={1}, misses={2})".format(hitp, hits, misses)) oprint(" L2ARC hit rate: {0:.2f}% (hits={1}, misses={2})".format(l2p, l2_hits, l2_misses)) oprint() if bpf: rb = bpf.get("READ_BYTES_BY_COMM", {}) if rb: rows = [] for comm, value in sorted(rb.items(), key=lambda kv: kv[1], reverse=True)[:args.top]: rows.append({"comm": comm, "read_bytes": human_bytes(value)}) print_table("Top comm wg read() bajtów z bpftrace", rows, [ ("comm", "comm"), ("read_bytes", "read_bytes"), ]) bb = bpf.get("BLOCK_BYTES_BY_COMM", {}) if bb: rows = [] for comm, value in sorted(bb.items(), key=lambda kv: kv[1], reverse=True)[:args.top]: rows.append({"comm": comm, "block_bytes": human_bytes(value)}) print_table("Top comm wg block_rq_issue bajtów z bpftrace", rows, [ ("comm", "comm"), ("block_bytes", "block_bytes"), ]) opens = bpf.get("OPENS", {}) if opens: rows = [] for path, value in sorted(opens.items(), key=lambda kv: kv[1], reverse=True)[:args.top]: rows.append({"path": path, "opens": value}) print_table("Top otwierane pliki z bpftrace", rows, [ ("path", "path"), ("opens", "opens"), ]) oprint("Najważniejsze pliki wynikowe:") for name in [ "zpool_status_start.txt", "zpool_status_end.txt", "zpool_history.txt", "zfs_properties.json", "samples_zpool.json", "samples_arc.json", "proc_totals.json", "bpftrace.txt", ]: path = os.path.join(outdir, name) if os.path.exists(path): oprint(" {0}".format(path)) if __name__ == "__main__": main()