diff --git a/zfs_probe.py b/zfs_probe.py new file mode 100644 index 0000000..8743ceb --- /dev/null +++ b/zfs_probe.py @@ -0,0 +1,729 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# example: python3 zfs_probe.py --duration 10 --interval 1 --track-files + +import argparse +import ast +import json +import math +import os +import re +import shutil +import signal +import statistics +import subprocess +import sys +import time +from collections import defaultdict +from datetime import datetime + + +def eprint(msg): + sys.stderr.write(str(msg) + "\n") + sys.stderr.flush() + + +def oprint(msg=""): + sys.stdout.write(str(msg) + "\n") + sys.stdout.flush() + + +def human_bytes(n): + if n is None: + return "-" + neg = n < 0 + n = abs(float(n)) + units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"] + for unit in units: + if n < 1024.0 or unit == units[-1]: + s = "{0:.1f} {1}".format(n, unit) if unit != "B" else "{0} B".format(int(n)) + return "-" + s if neg else s + n /= 1024.0 + + +def human_ns_to_ms(n): + if n is None: + return "-" + try: + return "{0:.2f} ms".format(float(n) / 1000000.0) + except Exception: + return "-" + + +def mean_or_zero(values): + return statistics.mean(values) if values else 0.0 + + +def percentile(values, q): + if not values: + return 0.0 + if len(values) == 1: + return float(values[0]) + values = sorted(values) + pos = (len(values) - 1) * q + lo = int(math.floor(pos)) + hi = int(math.ceil(pos)) + if lo == hi: + return float(values[lo]) + frac = pos - lo + return values[lo] * (1.0 - frac) + values[hi] * frac + + +def run_cmd(cmd, timeout=None): + proc = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + timeout=timeout, + check=False, + ) + return proc.returncode, proc.stdout, proc.stderr + + +def command_exists(name): + return shutil.which(name) is not None + + +def read_file(path, binary=False): + mode = "rb" if binary else "r" + with open(path, mode) as f: + return f.read() + + +def safe_read_text(path): + try: + return read_file(path, binary=False) + except Exception: + return "" + + +def ensure_dir(path): + if not os.path.isdir(path): + os.makedirs(path) + + +def parse_proc_io_text(text): + out = {} + for line in text.splitlines(): + if ":" not in line: + continue + k, v = line.split(":", 1) + try: + out[k.strip()] = int(v.strip()) + except ValueError: + continue + return out + + +def read_proc_snapshot(): + snap = {} + for pid in os.listdir("/proc"): + if not pid.isdigit(): + continue + base = os.path.join("/proc", pid) + try: + io_txt = read_file(os.path.join(base, "io")) + stat_txt = read_file(os.path.join(base, "stat")) + comm = safe_read_text(os.path.join(base, "comm")).strip() or "?" + cmdline_raw = read_file(os.path.join(base, "cmdline"), binary=True) + cmdline = cmdline_raw.replace(b"\x00", b" ").decode("utf-8", "replace").strip() + stat_parts = stat_txt.split() + starttime = int(stat_parts[21]) + io = parse_proc_io_text(io_txt) + key = "{0}:{1}".format(pid, starttime) + snap[key] = { + "pid": int(pid), + "starttime": starttime, + "comm": comm, + "cmdline": cmdline, + "io": io, + } + except (FileNotFoundError, ProcessLookupError, PermissionError, IndexError, ValueError): + continue + except Exception: + continue + return snap + + +def diff_proc_snapshots(prev, curr, accum): + tracked = ["rchar", "wchar", "syscr", "syscw", "read_bytes", "write_bytes", "cancelled_write_bytes"] + for key, cur in curr.items(): + if key not in prev: + continue + old = prev[key] + entry = accum.setdefault(key, { + "pid": cur["pid"], + "comm": cur["comm"], + "cmdline": cur["cmdline"], + "samples": 0, + "rchar": 0, + "wchar": 0, + "syscr": 0, + "syscw": 0, + "read_bytes": 0, + "write_bytes": 0, + "cancelled_write_bytes": 0, + "max_interval_read_bytes": 0, + "max_interval_write_bytes": 0, + }) + entry["samples"] += 1 + if cur.get("cmdline"): + entry["cmdline"] = cur["cmdline"] + for k in tracked: + delta = cur["io"].get(k, 0) - old["io"].get(k, 0) + if delta < 0: + delta = 0 + entry[k] += delta + if k == "read_bytes" and delta > entry["max_interval_read_bytes"]: + entry["max_interval_read_bytes"] = delta + if k == "write_bytes" and delta > entry["max_interval_write_bytes"]: + entry["max_interval_write_bytes"] = delta + + +def get_pools(pool_arg=None): + cmd = ["zpool", "list", "-H", "-o", "name"] + rc, out, err = run_cmd(cmd, timeout=20) + if rc != 0: + raise RuntimeError("Nie mogę pobrać listy puli: {0}".format(err.strip())) + pools = [x.strip() for x in out.splitlines() if x.strip()] + if pool_arg: + wanted = [x.strip() for x in pool_arg.split(",") if x.strip()] + pools = [p for p in pools if p in wanted] + return pools + + +def zpool_status_text(pools): + cmd = ["zpool", "status"] + pools + return run_cmd(cmd, timeout=30)[1] + + +def zpool_history_text(pools): + if not pools: + return "" + cmd = ["zpool", "history", "-il"] + pools + rc, out, err = run_cmd(cmd, timeout=60) + if rc != 0: + return "zpool history niedostępne: {0}\n".format(err.strip()) + return out + + +def zfs_get_properties(pools): + props = "atime,relatime,primarycache,secondarycache,prefetch,recordsize,mountpoint" + cmd = ["zfs", "get", "-H", "-r", "-o", "name,property,value", props] + pools + rc, out, err = run_cmd(cmd, timeout=60) + if rc != 0: + return [{"error": err.strip()}] + rows = [] + for line in out.splitlines(): + parts = line.split("\t") + if len(parts) >= 3: + rows.append({"name": parts[0], "property": parts[1], "value": parts[2]}) + return rows + + +def parse_arcstats(): + path = "/proc/spl/kstat/zfs/arcstats" + if not os.path.exists(path): + return None + raw = safe_read_text(path) + data = {} + for line in raw.splitlines(): + parts = line.split() + if len(parts) >= 3 and parts[0] not in ("name", "class"): + try: + data[parts[0]] = int(parts[2]) + except Exception: + continue + return data or None + + +def arc_delta(prev, curr): + if not prev or not curr: + return None + keys = [ + "hits", "misses", + "demand_data_hits", "demand_data_misses", + "demand_metadata_hits", "demand_metadata_misses", + "prefetch_data_hits", "prefetch_data_misses", + "prefetch_metadata_hits", "prefetch_metadata_misses", + "l2_hits", "l2_misses", + ] + out = {} + for k in keys: + out[k] = max(0, curr.get(k, 0) - prev.get(k, 0)) + return out + + +def parse_zpool_iostat_once(pool_list, interval): + """ + Działa z różnymi wersjami zpool iostat. + Próbuje najpierw z -l, potem bez -l. + Nie zakłada obecności timestampa. + """ + candidate_cmds = [ + ["zpool", "iostat", "-H", "-p", "-y", "-l"] + pool_list + [str(interval), "1"], + ["zpool", "iostat", "-H", "-p", "-y"] + pool_list + [str(interval), "1"], + ] + + last_err = "" + out = "" + for cmd in candidate_cmds: + rc, out, err = run_cmd(cmd, timeout=interval + 20) + if rc == 0 and out.strip(): + break + last_err = (err or out or "").strip() + out = "" + if not out.strip(): + raise RuntimeError("zpool iostat failed: {0}".format(last_err)) + + now_ts = int(time.time()) + rows = [] + + for line in out.splitlines(): + line = line.strip() + if not line: + continue + parts = line.split("\t") if "\t" in line else line.split() + if len(parts) < 7: + continue + + if parts[0].isdigit(): + ts = int(parts[0]) + data = parts[1:] + else: + ts = now_ts + data = parts + + if len(data) < 7: + continue + + try: + row = { + "ts": ts, + "name": data[0], + "alloc": int(data[1]), + "free": int(data[2]), + "rops": int(data[3]), + "wops": int(data[4]), + "rbytes": int(data[5]), + "wbytes": int(data[6]), + } + except ValueError: + continue + + latency_names = [ + "total_wait_ns", "disk_wait_ns", "syncq_wait_ns", + "asyncq_wait_ns", "scrub_wait_ns", "trim_wait_ns", "rebuild_wait_ns", + ] + for i, key in enumerate(latency_names, start=7): + if i < len(data): + try: + row[key] = int(data[i]) + except ValueError: + row[key] = 0 + + rows.append(row) + + return rows + + +def summarize_samples(samples_by_pool): + summary = {} + for pool, samples in samples_by_pool.items(): + rb = [s["rbytes"] for s in samples] + wb = [s["wbytes"] for s in samples] + ro = [s["rops"] for s in samples] + wo = [s["wops"] for s in samples] + tw = [s.get("total_wait_ns", 0) for s in samples] + dw = [s.get("disk_wait_ns", 0) for s in samples] + summary[pool] = { + "samples": len(samples), + "read_avg": mean_or_zero(rb), + "write_avg": mean_or_zero(wb), + "read_p95": percentile(rb, 0.95), + "write_p95": percentile(wb, 0.95), + "read_max": max(rb) if rb else 0, + "write_max": max(wb) if wb else 0, + "rops_avg": mean_or_zero(ro), + "wops_avg": mean_or_zero(wo), + "total_wait_avg_ns": mean_or_zero(tw), + "disk_wait_avg_ns": mean_or_zero(dw), + "total_wait_p95_ns": percentile(tw, 0.95), + "disk_wait_p95_ns": percentile(dw, 0.95), + } + return summary + + +def top_entries(entries, key, topn): + vals = [v for v in entries if v.get(key, 0) > 0] + vals.sort(key=lambda x: x.get(key, 0), reverse=True) + return vals[:topn] + + +def start_bpftrace(outdir, track_files=False): + if os.geteuid() != 0: + return None, "bpftrace pominięty: uruchom jako root." + if not command_exists("bpftrace"): + return None, "bpftrace pominięty: brak polecenia bpftrace." + + program = [] + program.append('tracepoint:syscalls:sys_exit_read /args.ret > 0/ { @read_bytes_by_comm[comm] = sum(args.ret); @read_calls_by_comm[comm] = count(); }') + program.append('tracepoint:block:block_rq_issue { @block_bytes_by_comm[comm] = sum(args.bytes); @block_ios_by_comm[comm] = count(); }') + if track_files: + program.append('tracepoint:syscalls:sys_enter_openat { @opens[str(args.filename)] = count(); @opens_by_comm[comm, str(args.filename)] = count(); }') + program.append('END {') + program.append(' printf("===READ_BYTES_BY_COMM===\\n"); print(@read_bytes_by_comm);') + program.append(' printf("===READ_CALLS_BY_COMM===\\n"); print(@read_calls_by_comm);') + program.append(' printf("===BLOCK_BYTES_BY_COMM===\\n"); print(@block_bytes_by_comm);') + program.append(' printf("===BLOCK_IOS_BY_COMM===\\n"); print(@block_ios_by_comm);') + if track_files: + program.append(' printf("===OPENS===\\n"); print(@opens);') + program.append(' printf("===OPENS_BY_COMM===\\n"); print(@opens_by_comm);') + program.append('}') + + bt_path = os.path.join(outdir, "trace.bt") + with open(bt_path, "w") as f: + f.write("\n".join(program) + "\n") + + out_path = os.path.join(outdir, "bpftrace.txt") + out_f = open(out_path, "w") + proc = subprocess.Popen( + ["bpftrace", bt_path], + stdout=out_f, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) + return {"proc": proc, "out_f": out_f, "out_path": out_path, "bt_path": bt_path}, None + + +SECTION_RE = re.compile(r"^===([A-Z0-9_]+)===$") +MAP_LINE_RE = re.compile(r'^@[^[]+\[(.*)\]:\s+(-?\d+)$') + + +def parse_bpftrace_output(path): + result = defaultdict(dict) + if not path or not os.path.exists(path): + return result + section = None + for raw in safe_read_text(path).splitlines(): + line = raw.strip() + m = SECTION_RE.match(line) + if m: + section = m.group(1) + continue + m = MAP_LINE_RE.match(line) + if not m or not section: + continue + key_raw = m.group(1).strip() + value = int(m.group(2)) + try: + parsed_key = ast.literal_eval(key_raw) + except Exception: + try: + parsed_key = ast.literal_eval("({0},)".format(key_raw)) + except Exception: + parsed_key = key_raw.strip('"') + result[section][parsed_key] = value + return result + + +def format_proc_entry(e): + cmd = e.get("cmdline") or e.get("comm") or "?" + if len(cmd) > 120: + cmd = cmd[:117] + "..." + return '{0} [pid {1}] {2}'.format(e.get("comm", "?"), e.get("pid", "?"), cmd) + + +def print_table(title, rows, cols): + oprint(title) + if not rows: + oprint(" brak danych") + oprint() + return + widths = [] + for col_name, key in cols: + width = len(col_name) + for row in rows: + width = max(width, len(str(row.get(key, "")))) + widths.append(width) + header = " " + " ".join(col_name.ljust(widths[i]) for i, (col_name, _) in enumerate(cols)) + oprint(header) + oprint(" " + " ".join("-" * w for w in widths)) + for row in rows: + oprint(" " + " ".join(str(row.get(key, "")).ljust(widths[i]) for i, (_, key) in enumerate(cols))) + oprint() + + +def stop_bpftrace(handle): + if not handle: + return + proc = handle["proc"] + if proc.poll() is not None: + handle["out_f"].close() + return + + try: + proc.send_signal(signal.SIGINT) + proc.wait(timeout=3) + except Exception: + try: + proc.terminate() + proc.wait(timeout=2) + except Exception: + try: + proc.kill() + proc.wait(timeout=2) + except Exception: + pass + try: + handle["out_f"].close() + except Exception: + pass + + +def main(): + ap = argparse.ArgumentParser(description="Zbiera próbki ZFS/ARC/procesów przez zadany czas i pokazuje top statystyki.") + ap.add_argument("--duration", type=int, default=3600, help="Czas zbierania w sekundach, np. 7200") + ap.add_argument("--interval", type=int, default=5, help="Interwał próbek w sekundach") + ap.add_argument("--pool", default="", help="Opcjonalnie: jedna lub kilka puli, rozdzielone przecinkiem") + ap.add_argument("--top", type=int, default=15, help="Ile pozycji pokazać w topkach") + ap.add_argument("--outdir", default="", help="Katalog na logi i JSON") + ap.add_argument("--track-files", action="store_true", help="Śledź top otwierane pliki przez bpftrace") + ap.add_argument("--no-bpf", action="store_true", help="Nie uruchamiaj bpftrace nawet gdy jest dostępny") + args = ap.parse_args() + + if args.interval <= 0 or args.duration <= 0: + eprint("duration i interval muszą być > 0") + sys.exit(2) + + if os.geteuid() != 0: + eprint("Uwaga: bez roota część /proc i bpftrace może być niepełna.") + + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + outdir = args.outdir or os.path.abspath("./zfs_probe_{0}".format(ts)) + ensure_dir(outdir) + + oprint("Start. Zbieram dane do: {0}".format(outdir)) + pools = get_pools(args.pool) + if not pools: + eprint("Nie znaleziono żadnych puli ZFS.") + sys.exit(1) + + oprint("Pule: {0}".format(", ".join(pools))) + meta = { + "started_at": datetime.now().isoformat(), + "duration": args.duration, + "interval": args.interval, + "pools": pools, + "hostname": os.uname().nodename, + "uid": os.geteuid(), + } + with open(os.path.join(outdir, "meta.json"), "w") as f: + json.dump(meta, f, indent=2) + + with open(os.path.join(outdir, "zpool_status_start.txt"), "w") as f: + f.write(zpool_status_text(pools)) + with open(os.path.join(outdir, "zpool_history.txt"), "w") as f: + f.write(zpool_history_text(pools)) + with open(os.path.join(outdir, "zfs_properties.json"), "w") as f: + json.dump(zfs_get_properties(pools), f, indent=2) + + bpf_handle = None + bpf_note = None + if not args.no_bpf: + bpf_handle, bpf_note = start_bpftrace(outdir, track_files=args.track_files) + else: + bpf_note = "bpftrace wyłączony przez --no-bpf" + + if bpf_note: + oprint(bpf_note) + + proc_prev = read_proc_snapshot() + arc_prev = parse_arcstats() + + samples_by_pool = defaultdict(list) + arc_samples = [] + deadline = time.time() + args.duration + rounds = max(1, int(math.ceil(float(args.duration) / float(args.interval)))) + proc_accum = {} + + for n in range(rounds): + remaining = deadline - time.time() + if remaining <= 0: + break + interval = min(args.interval, max(1, int(round(remaining)))) + oprint("[{0}/{1}] próbka, interwał {2}s...".format(n + 1, rounds, interval)) + try: + rows = parse_zpool_iostat_once(pools, interval) + except Exception as ex: + eprint("Błąd zpool iostat: {0}".format(ex)) + break + + for row in rows: + samples_by_pool[row["name"]].append(row) + + proc_cur = read_proc_snapshot() + diff_proc_snapshots(proc_prev, proc_cur, proc_accum) + proc_prev = proc_cur + + arc_cur = parse_arcstats() + delta = arc_delta(arc_prev, arc_cur) + if delta is not None: + arc_samples.append(delta) + arc_prev = arc_cur + + oprint("[{0}/{1}] gotowe".format(n + 1, rounds)) + + if bpf_handle: + oprint("Kończę bpftrace...") + stop_bpftrace(bpf_handle) + + with open(os.path.join(outdir, "zpool_status_end.txt"), "w") as f: + f.write(zpool_status_text(pools)) + + with open(os.path.join(outdir, "samples_zpool.json"), "w") as f: + json.dump(samples_by_pool, f, indent=2) + with open(os.path.join(outdir, "samples_arc.json"), "w") as f: + json.dump(arc_samples, f, indent=2) + with open(os.path.join(outdir, "proc_totals.json"), "w") as f: + json.dump(proc_accum, f, indent=2) + + sample_summary = summarize_samples(samples_by_pool) + bpf = parse_bpftrace_output(bpf_handle["out_path"] if bpf_handle else "") + + proc_rows = list(proc_accum.values()) + + top_proc_read = [] + for e in top_entries(proc_rows, "read_bytes", args.top): + top_proc_read.append({ + "proc": format_proc_entry(e), + "read": human_bytes(e["read_bytes"]), + "write": human_bytes(e["write_bytes"]), + "max_read_interval": human_bytes(e["max_interval_read_bytes"]), + "syscr": e["syscr"], + }) + + top_proc_write = [] + for e in top_entries(proc_rows, "write_bytes", args.top): + top_proc_write.append({ + "proc": format_proc_entry(e), + "write": human_bytes(e["write_bytes"]), + "read": human_bytes(e["read_bytes"]), + "max_write_interval": human_bytes(e["max_interval_write_bytes"]), + "syscw": e["syscw"], + }) + + oprint() + oprint("Wyniki zapisane w: {0}".format(outdir)) + oprint("Pule: {0}".format(", ".join(pools))) + oprint("Czas zbierania: {0}s, interwał: {1}s, próbek: {2}".format( + args.duration, args.interval, sum(len(v) for v in samples_by_pool.values()) + )) + oprint() + + pool_rows = [] + for pool in pools: + s = sample_summary.get(pool, {}) + pool_rows.append({ + "pool": pool, + "avg_read/s": human_bytes(s.get("read_avg", 0)), + "avg_write/s": human_bytes(s.get("write_avg", 0)), + "p95_read/s": human_bytes(s.get("read_p95", 0)), + "p95_write/s": human_bytes(s.get("write_p95", 0)), + "max_read/s": human_bytes(s.get("read_max", 0)), + "max_write/s": human_bytes(s.get("write_max", 0)), + "avg_total_wait": human_ns_to_ms(s.get("total_wait_avg_ns", 0)), + "avg_disk_wait": human_ns_to_ms(s.get("disk_wait_avg_ns", 0)), + }) + + print_table("Podsumowanie puli", pool_rows, [ + ("pool", "pool"), + ("avg_read/s", "avg_read/s"), + ("avg_write/s", "avg_write/s"), + ("p95_read/s", "p95_read/s"), + ("p95_write/s", "p95_write/s"), + ("avg_total_wait", "avg_total_wait"), + ("avg_disk_wait", "avg_disk_wait"), + ]) + + print_table("Top procesy wg read_bytes z /proc//io", top_proc_read, [ + ("proc", "proc"), + ("read", "read"), + ("write", "write"), + ("max_read_interval", "max_read_interval"), + ("syscr", "syscr"), + ]) + + print_table("Top procesy wg write_bytes z /proc//io", top_proc_write, [ + ("proc", "proc"), + ("write", "write"), + ("read", "read"), + ("max_write_interval", "max_write_interval"), + ("syscw", "syscw"), + ]) + + if arc_samples: + hits = sum(x.get("hits", 0) for x in arc_samples) + misses = sum(x.get("misses", 0) for x in arc_samples) + total = hits + misses + hitp = (100.0 * hits / total) if total else 0.0 + l2_hits = sum(x.get("l2_hits", 0) for x in arc_samples) + l2_misses = sum(x.get("l2_misses", 0) for x in arc_samples) + l2_total = l2_hits + l2_misses + l2p = (100.0 * l2_hits / l2_total) if l2_total else 0.0 + oprint("ARC/L2ARC") + oprint(" ARC hit rate: {0:.2f}% (hits={1}, misses={2})".format(hitp, hits, misses)) + oprint(" L2ARC hit rate: {0:.2f}% (hits={1}, misses={2})".format(l2p, l2_hits, l2_misses)) + oprint() + + if bpf: + rb = bpf.get("READ_BYTES_BY_COMM", {}) + if rb: + rows = [] + for comm, value in sorted(rb.items(), key=lambda kv: kv[1], reverse=True)[:args.top]: + rows.append({"comm": comm, "read_bytes": human_bytes(value)}) + print_table("Top comm wg read() bajtów z bpftrace", rows, [ + ("comm", "comm"), + ("read_bytes", "read_bytes"), + ]) + + bb = bpf.get("BLOCK_BYTES_BY_COMM", {}) + if bb: + rows = [] + for comm, value in sorted(bb.items(), key=lambda kv: kv[1], reverse=True)[:args.top]: + rows.append({"comm": comm, "block_bytes": human_bytes(value)}) + print_table("Top comm wg block_rq_issue bajtów z bpftrace", rows, [ + ("comm", "comm"), + ("block_bytes", "block_bytes"), + ]) + + opens = bpf.get("OPENS", {}) + if opens: + rows = [] + for path, value in sorted(opens.items(), key=lambda kv: kv[1], reverse=True)[:args.top]: + rows.append({"path": path, "opens": value}) + print_table("Top otwierane pliki z bpftrace", rows, [ + ("path", "path"), + ("opens", "opens"), + ]) + + oprint("Najważniejsze pliki wynikowe:") + for name in [ + "zpool_status_start.txt", + "zpool_status_end.txt", + "zpool_history.txt", + "zfs_properties.json", + "samples_zpool.json", + "samples_arc.json", + "proc_totals.json", + "bpftrace.txt", + ]: + path = os.path.join(outdir, name) + if os.path.exists(path): + oprint(" {0}".format(path)) + + +if __name__ == "__main__": + main()