#!/usr/bin/env python3 """ rtorrent_cli.py - proste CLI do masowego zarządzania rTorrent przez XML-RPC/SCGI. Domyślny endpoint: scgi://127.0.0.1:5000 Przykłady: python3 rtorrent_cli.py ping python3 rtorrent_cli.py list python3 rtorrent_cli.py list --only-stopped --only-complete python3 rtorrent_cli.py show HASH python3 rtorrent_cli.py start HASH python3 rtorrent_cli.py bulk-start --only-stopped --only-complete python3 rtorrent_cli.py bulk-stop --name-regex "ubuntu|debian" python3 rtorrent_cli.py bulk-announce --only-active python3 rtorrent_cli.py bulk-check-hash --only-stopped --name-regex "movie" python3 rtorrent_cli.py dump-methods """ from __future__ import annotations import argparse import json import re import socket import sys import xmlrpc.client from dataclasses import dataclass, asdict from typing import Any, Iterable from urllib.parse import urlparse DEFAULT_URL = "scgi://127.0.0.1:5000" # ---------------------------- # SCGI XML-RPC transport # ---------------------------- class SCGITransport(xmlrpc.client.Transport): def __init__(self, host: str, port: int, timeout: int = 15): super().__init__() self.host = host self.port = port self.timeout = timeout def request(self, host: str, handler: str, request_body: bytes, verbose: bool = False): body = request_body.encode("utf-8") if isinstance(request_body, str) else request_body headers = { "CONTENT_LENGTH": str(len(body)), "SCGI": "1", "REQUEST_METHOD": "POST", "REQUEST_URI": handler or "/RPC2", } header_bytes = b"" for key, value in headers.items(): header_bytes += key.encode("utf-8") + b"\x00" + value.encode("utf-8") + b"\x00" packet = str(len(header_bytes)).encode("ascii") + b":" + header_bytes + b"," + body with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock: sock.sendall(packet) response = b"" while True: chunk = sock.recv(65536) if not chunk: break response += chunk # rTorrent przez SCGI zwykle zwraca same XML body, ale niektóre proxy mogą dodać nagłówki HTTP. if b"\r\n\r\n" in response: response = response.split(b"\r\n\r\n", 1)[1] return self.parse_response_bytes(response) def parse_response_bytes(self, data: bytes): p, u = self.getparser() p.feed(data) p.close() return u.close() def make_rpc_client(url: str, timeout: int): parsed = urlparse(url) if parsed.scheme == "scgi": if not parsed.hostname: raise ValueError("SCGI URL musi mieć host, np. scgi://127.0.0.1:5000") transport = SCGITransport(parsed.hostname, parsed.port or 5000, timeout=timeout) return xmlrpc.client.ServerProxy( "http://rtorrent/RPC2", transport=transport, allow_none=True, ) return xmlrpc.client.ServerProxy(url, allow_none=True) # ---------------------------- # Helpers # ---------------------------- @dataclass class Torrent: hash: str name: str state: int active: int complete: int size_bytes: int completed_bytes: int ratio: int down_rate: int up_rate: int message: str @property def stopped(self) -> bool: return self.state == 0 @property def started_or_paused(self) -> bool: return self.state == 1 @property def is_active(self) -> bool: return self.active == 1 @property def is_complete(self) -> bool: return self.complete == 1 def rpc_error(exc: Exception, context: dict[str, Any] | None = None) -> dict[str, Any]: payload = { "ok": False, "error_type": exc.__class__.__name__, "error": str(exc), } if context: payload["context"] = context return payload def print_json(data: Any) -> None: print(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=False)) def call_method(rpc, method: str, *args): return getattr(rpc, method)(*args) def safe_call(rpc, method: str, *args, context: dict[str, Any] | None = None): try: return True, call_method(rpc, method, *args) except Exception as exc: return False, rpc_error(exc, context=context or {"method": method, "args": args}) def human_bytes(num: int) -> str: value = float(num) for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]: if abs(value) < 1024: return f"{value:.1f} {unit}" value /= 1024 return f"{value:.1f} EiB" # ---------------------------- # rTorrent API # ---------------------------- def get_torrent_hashes(rpc, view: str = "main") -> list[str]: ok, result = safe_call(rpc, "d.multicall2", "", view, "d.hash=") if not ok: raise RuntimeError(json.dumps(result, ensure_ascii=False)) hashes: list[str] = [] for row in result: if isinstance(row, list) and row: hashes.append(str(row[0])) elif isinstance(row, str): hashes.append(row) return hashes def list_torrents(rpc, view: str = "main") -> list[Torrent]: methods = [ "d.hash=", "d.name=", "d.state=", "d.is_active=", "d.complete=", "d.size_bytes=", "d.completed_bytes=", "d.ratio=", "d.down.rate=", "d.up.rate=", "d.message=", ] ok, result = safe_call(rpc, "d.multicall2", "", view, *methods) if not ok: raise RuntimeError(json.dumps(result, ensure_ascii=False)) torrents: list[Torrent] = [] for row in result: torrents.append(Torrent( hash=str(row[0]), name=str(row[1]), state=int(row[2]), active=int(row[3]), complete=int(row[4]), size_bytes=int(row[5]), completed_bytes=int(row[6]), ratio=int(row[7]), down_rate=int(row[8]), up_rate=int(row[9]), message=str(row[10]), )) return torrents def get_torrent(rpc, hash_: str) -> Torrent: torrents = list_torrents(rpc) for torrent in torrents: if torrent.hash.lower() == hash_.lower(): return torrent raise KeyError(f"Nie znaleziono torrenta: {hash_}") def filter_torrents(torrents: Iterable[Torrent], args) -> list[Torrent]: result = list(torrents) if getattr(args, "only_stopped", False): result = [t for t in result if t.stopped] if getattr(args, "only_started", False): result = [t for t in result if t.started_or_paused] if getattr(args, "only_active", False): result = [t for t in result if t.is_active] if getattr(args, "only_complete", False): result = [t for t in result if t.is_complete] if getattr(args, "only_incomplete", False): result = [t for t in result if not t.is_complete] if getattr(args, "name_regex", None): pattern = re.compile(args.name_regex, re.IGNORECASE) result = [t for t in result if pattern.search(t.name)] if getattr(args, "hash_regex", None): pattern = re.compile(args.hash_regex, re.IGNORECASE) result = [t for t in result if pattern.search(t.hash)] return result def torrent_to_dict(t: Torrent) -> dict[str, Any]: data = asdict(t) data["size"] = human_bytes(t.size_bytes) data["completed"] = human_bytes(t.completed_bytes) data["ratio_float"] = round(t.ratio / 1000, 3) return data # ---------------------------- # Commands # ---------------------------- def cmd_ping(rpc, args) -> int: ok, result = safe_call(rpc, "system.client_version") if ok: print_json({"ok": True, "client_version": result}) return 0 # fallback dla starszych buildów ok, result = safe_call(rpc, "system.listMethods") print_json({"ok": ok, "result": result}) return 0 if ok else 1 def cmd_list(rpc, args) -> int: try: torrents = filter_torrents(list_torrents(rpc, args.view), args) print_json({ "ok": True, "count": len(torrents), "torrents": [torrent_to_dict(t) for t in torrents], }) return 0 except Exception as exc: print_json(rpc_error(exc)) return 1 def cmd_show(rpc, args) -> int: try: t = get_torrent(rpc, args.hash) print_json({"ok": True, "torrent": torrent_to_dict(t)}) return 0 except Exception as exc: print_json(rpc_error(exc, {"hash": args.hash})) return 1 ACTION_METHODS = { "start": "d.start", "stop": "d.stop", "open": "d.open", "close": "d.close", "check-hash": "d.check_hash", "announce": "d.tracker_announce", } def cmd_single_action(rpc, args) -> int: method = ACTION_METHODS[args.command] ok, result = safe_call(rpc, method, args.hash, context={"hash": args.hash, "method": method}) print_json({ "ok": ok, "action": args.command, "hash": args.hash, "result": result, }) return 0 if ok else 1 BULK_ACTION_METHODS = { "bulk-start": "d.start", "bulk-stop": "d.stop", "bulk-open": "d.open", "bulk-close": "d.close", "bulk-check-hash": "d.check_hash", "bulk-announce": "d.tracker_announce", } def cmd_bulk_action(rpc, args) -> int: try: torrents = filter_torrents(list_torrents(rpc, args.view), args) except Exception as exc: print_json(rpc_error(exc)) return 1 method = BULK_ACTION_METHODS[args.command] results = [] if args.dry_run: print_json({ "ok": True, "dry_run": True, "action": args.command, "method": method, "count": len(torrents), "targets": [torrent_to_dict(t) for t in torrents], }) return 0 failed = 0 for t in torrents: ok, result = safe_call(rpc, method, t.hash, context={"hash": t.hash, "name": t.name, "method": method}) if not ok: failed += 1 results.append({ "ok": ok, "hash": t.hash, "name": t.name, "result": result, }) print_json({ "ok": failed == 0, "action": args.command, "method": method, "count": len(torrents), "failed": failed, "results": results, }) return 0 if failed == 0 else 2 def cmd_bulk_start_complete(rpc, args) -> int: args.only_complete = True args.command = "bulk-start" return cmd_bulk_action(rpc, args) def cmd_dump_methods(rpc, args) -> int: ok, result = safe_call(rpc, "system.listMethods") print_json({"ok": ok, "methods": result if ok else None, "error": None if ok else result}) return 0 if ok else 1 # ---------------------------- # CLI # ---------------------------- def add_filters(parser: argparse.ArgumentParser) -> None: parser.add_argument("--view", default="main", help="Widok rTorrent, domyślnie: main") parser.add_argument("--only-stopped", action="store_true", help="Tylko state=0") parser.add_argument("--only-started", action="store_true", help="Tylko state=1") parser.add_argument("--only-active", action="store_true", help="Tylko is_active=1") parser.add_argument("--only-complete", action="store_true", help="Tylko complete=1") parser.add_argument("--only-incomplete", action="store_true", help="Tylko complete=0") parser.add_argument("--name-regex", help="Filtr regex po nazwie") parser.add_argument("--hash-regex", help="Filtr regex po hashu") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="CLI do diagnozy i masowego sterowania rTorrent przez XML-RPC/SCGI." ) parser.add_argument( "--url", default=DEFAULT_URL, help=f"Endpoint RPC, domyślnie: {DEFAULT_URL}", ) parser.add_argument("--timeout", type=int, default=15, help="Timeout połączenia w sekundach") sub = parser.add_subparsers(dest="command", required=True) sub.add_parser("ping", help="Test połączenia") p = sub.add_parser("list", help="Lista torrentów") add_filters(p) p = sub.add_parser("show", help="Szczegóły torrenta") p.add_argument("hash") for name in ACTION_METHODS: p = sub.add_parser(name, help=f"Wykonaj {name} na jednym torrencie") p.add_argument("hash") for name in BULK_ACTION_METHODS: p = sub.add_parser(name, help=f"Masowo wykonaj {name}") add_filters(p) p.add_argument("--dry-run", action="store_true", help="Pokaż cele bez wykonywania akcji") p = sub.add_parser("bulk-start-complete", help="Wystartuj kompletne torrenty") add_filters(p) p.add_argument("--dry-run", action="store_true", help="Pokaż cele bez wykonywania akcji") sub.add_parser("dump-methods", help="Wypisz dostępne metody RPC") return parser def main() -> int: args = build_parser().parse_args() try: rpc = make_rpc_client(args.url, args.timeout) except Exception as exc: print_json(rpc_error(exc, {"url": args.url})) return 1 if args.command == "ping": return cmd_ping(rpc, args) if args.command == "list": return cmd_list(rpc, args) if args.command == "show": return cmd_show(rpc, args) if args.command in ACTION_METHODS: return cmd_single_action(rpc, args) if args.command in BULK_ACTION_METHODS: return cmd_bulk_action(rpc, args) if args.command == "bulk-start-complete": return cmd_bulk_start_complete(rpc, args) if args.command == "dump-methods": return cmd_dump_methods(rpc, args) print_json({"ok": False, "error": f"Nieznana komenda: {args.command}"}) return 1 if __name__ == "__main__": sys.exit(main())