From d925f207e15f9b580a6e42634d9d236da8691f84 Mon Sep 17 00:00:00 2001 From: gru Date: Fri, 15 May 2026 10:31:32 +0200 Subject: [PATCH] Add rtorrent_cli.py --- rtorrent_cli.py | 489 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 489 insertions(+) create mode 100644 rtorrent_cli.py diff --git a/rtorrent_cli.py b/rtorrent_cli.py new file mode 100644 index 0000000..7811749 --- /dev/null +++ b/rtorrent_cli.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 +""" +rtorrent_cli.py - proste CLI do masowego zarządzania rTorrent przez XML-RPC/SCGI. + +Domyślny endpoint: + scgi://127.0.0.1:5000 + +Przykłady: + python3 rtorrent_cli.py ping + python3 rtorrent_cli.py list + python3 rtorrent_cli.py list --only-stopped --only-complete + python3 rtorrent_cli.py show HASH + python3 rtorrent_cli.py start HASH + python3 rtorrent_cli.py bulk-start --only-stopped --only-complete + python3 rtorrent_cli.py bulk-stop --name-regex "ubuntu|debian" + python3 rtorrent_cli.py bulk-announce --only-active + python3 rtorrent_cli.py bulk-check-hash --only-stopped --name-regex "movie" + python3 rtorrent_cli.py dump-methods +""" + +from __future__ import annotations + +import argparse +import json +import re +import socket +import sys +import xmlrpc.client +from dataclasses import dataclass, asdict +from typing import Any, Iterable +from urllib.parse import urlparse + + +DEFAULT_URL = "scgi://127.0.0.1:5000" + + +# ---------------------------- +# SCGI XML-RPC transport +# ---------------------------- + +class SCGITransport(xmlrpc.client.Transport): + def __init__(self, host: str, port: int, timeout: int = 15): + super().__init__() + self.host = host + self.port = port + self.timeout = timeout + + def request(self, host: str, handler: str, request_body: bytes, verbose: bool = False): + body = request_body.encode("utf-8") if isinstance(request_body, str) else request_body + + headers = { + "CONTENT_LENGTH": str(len(body)), + "SCGI": "1", + "REQUEST_METHOD": "POST", + "REQUEST_URI": handler or "/RPC2", + } + + header_bytes = b"" + for key, value in headers.items(): + header_bytes += key.encode("utf-8") + b"\x00" + value.encode("utf-8") + b"\x00" + + packet = str(len(header_bytes)).encode("ascii") + b":" + header_bytes + b"," + body + + with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock: + sock.sendall(packet) + response = b"" + while True: + chunk = sock.recv(65536) + if not chunk: + break + response += chunk + + # rTorrent przez SCGI zwykle zwraca same XML body, ale niektóre proxy mogą dodać nagłówki HTTP. + if b"\r\n\r\n" in response: + response = response.split(b"\r\n\r\n", 1)[1] + + return self.parse_response_bytes(response) + + def parse_response_bytes(self, data: bytes): + p, u = self.getparser() + p.feed(data) + p.close() + return u.close() + + +def make_rpc_client(url: str, timeout: int): + parsed = urlparse(url) + + if parsed.scheme == "scgi": + if not parsed.hostname: + raise ValueError("SCGI URL musi mieć host, np. scgi://127.0.0.1:5000") + transport = SCGITransport(parsed.hostname, parsed.port or 5000, timeout=timeout) + return xmlrpc.client.ServerProxy( + "http://rtorrent/RPC2", + transport=transport, + allow_none=True, + ) + + return xmlrpc.client.ServerProxy(url, allow_none=True) + + +# ---------------------------- +# Helpers +# ---------------------------- + +@dataclass +class Torrent: + hash: str + name: str + state: int + active: int + complete: int + size_bytes: int + completed_bytes: int + ratio: int + down_rate: int + up_rate: int + message: str + + @property + def stopped(self) -> bool: + return self.state == 0 + + @property + def started_or_paused(self) -> bool: + return self.state == 1 + + @property + def is_active(self) -> bool: + return self.active == 1 + + @property + def is_complete(self) -> bool: + return self.complete == 1 + + +def rpc_error(exc: Exception, context: dict[str, Any] | None = None) -> dict[str, Any]: + payload = { + "ok": False, + "error_type": exc.__class__.__name__, + "error": str(exc), + } + if context: + payload["context"] = context + return payload + + +def print_json(data: Any) -> None: + print(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=False)) + + +def call_method(rpc, method: str, *args): + return getattr(rpc, method)(*args) + + +def safe_call(rpc, method: str, *args, context: dict[str, Any] | None = None): + try: + return True, call_method(rpc, method, *args) + except Exception as exc: + return False, rpc_error(exc, context=context or {"method": method, "args": args}) + + +def human_bytes(num: int) -> str: + value = float(num) + for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]: + if abs(value) < 1024: + return f"{value:.1f} {unit}" + value /= 1024 + return f"{value:.1f} EiB" + + +# ---------------------------- +# rTorrent API +# ---------------------------- + +def get_torrent_hashes(rpc, view: str = "main") -> list[str]: + ok, result = safe_call(rpc, "d.multicall2", "", view, "d.hash=") + if not ok: + raise RuntimeError(json.dumps(result, ensure_ascii=False)) + + hashes: list[str] = [] + for row in result: + if isinstance(row, list) and row: + hashes.append(str(row[0])) + elif isinstance(row, str): + hashes.append(row) + return hashes + + +def list_torrents(rpc, view: str = "main") -> list[Torrent]: + methods = [ + "d.hash=", + "d.name=", + "d.state=", + "d.is_active=", + "d.complete=", + "d.size_bytes=", + "d.completed_bytes=", + "d.ratio=", + "d.down.rate=", + "d.up.rate=", + "d.message=", + ] + + ok, result = safe_call(rpc, "d.multicall2", "", view, *methods) + if not ok: + raise RuntimeError(json.dumps(result, ensure_ascii=False)) + + torrents: list[Torrent] = [] + for row in result: + torrents.append(Torrent( + hash=str(row[0]), + name=str(row[1]), + state=int(row[2]), + active=int(row[3]), + complete=int(row[4]), + size_bytes=int(row[5]), + completed_bytes=int(row[6]), + ratio=int(row[7]), + down_rate=int(row[8]), + up_rate=int(row[9]), + message=str(row[10]), + )) + return torrents + + +def get_torrent(rpc, hash_: str) -> Torrent: + torrents = list_torrents(rpc) + for torrent in torrents: + if torrent.hash.lower() == hash_.lower(): + return torrent + raise KeyError(f"Nie znaleziono torrenta: {hash_}") + + +def filter_torrents(torrents: Iterable[Torrent], args) -> list[Torrent]: + result = list(torrents) + + if getattr(args, "only_stopped", False): + result = [t for t in result if t.stopped] + + if getattr(args, "only_started", False): + result = [t for t in result if t.started_or_paused] + + if getattr(args, "only_active", False): + result = [t for t in result if t.is_active] + + if getattr(args, "only_complete", False): + result = [t for t in result if t.is_complete] + + if getattr(args, "only_incomplete", False): + result = [t for t in result if not t.is_complete] + + if getattr(args, "name_regex", None): + pattern = re.compile(args.name_regex, re.IGNORECASE) + result = [t for t in result if pattern.search(t.name)] + + if getattr(args, "hash_regex", None): + pattern = re.compile(args.hash_regex, re.IGNORECASE) + result = [t for t in result if pattern.search(t.hash)] + + return result + + +def torrent_to_dict(t: Torrent) -> dict[str, Any]: + data = asdict(t) + data["size"] = human_bytes(t.size_bytes) + data["completed"] = human_bytes(t.completed_bytes) + data["ratio_float"] = round(t.ratio / 1000, 3) + return data + + +# ---------------------------- +# Commands +# ---------------------------- + +def cmd_ping(rpc, args) -> int: + ok, result = safe_call(rpc, "system.client_version") + if ok: + print_json({"ok": True, "client_version": result}) + return 0 + + # fallback dla starszych buildów + ok, result = safe_call(rpc, "system.listMethods") + print_json({"ok": ok, "result": result}) + return 0 if ok else 1 + + +def cmd_list(rpc, args) -> int: + try: + torrents = filter_torrents(list_torrents(rpc, args.view), args) + print_json({ + "ok": True, + "count": len(torrents), + "torrents": [torrent_to_dict(t) for t in torrents], + }) + return 0 + except Exception as exc: + print_json(rpc_error(exc)) + return 1 + + +def cmd_show(rpc, args) -> int: + try: + t = get_torrent(rpc, args.hash) + print_json({"ok": True, "torrent": torrent_to_dict(t)}) + return 0 + except Exception as exc: + print_json(rpc_error(exc, {"hash": args.hash})) + return 1 + + +ACTION_METHODS = { + "start": "d.start", + "stop": "d.stop", + "open": "d.open", + "close": "d.close", + "check-hash": "d.check_hash", + "announce": "d.tracker_announce", +} + + +def cmd_single_action(rpc, args) -> int: + method = ACTION_METHODS[args.command] + ok, result = safe_call(rpc, method, args.hash, context={"hash": args.hash, "method": method}) + print_json({ + "ok": ok, + "action": args.command, + "hash": args.hash, + "result": result, + }) + return 0 if ok else 1 + + +BULK_ACTION_METHODS = { + "bulk-start": "d.start", + "bulk-stop": "d.stop", + "bulk-open": "d.open", + "bulk-close": "d.close", + "bulk-check-hash": "d.check_hash", + "bulk-announce": "d.tracker_announce", +} + + +def cmd_bulk_action(rpc, args) -> int: + try: + torrents = filter_torrents(list_torrents(rpc, args.view), args) + except Exception as exc: + print_json(rpc_error(exc)) + return 1 + + method = BULK_ACTION_METHODS[args.command] + results = [] + + if args.dry_run: + print_json({ + "ok": True, + "dry_run": True, + "action": args.command, + "method": method, + "count": len(torrents), + "targets": [torrent_to_dict(t) for t in torrents], + }) + return 0 + + failed = 0 + for t in torrents: + ok, result = safe_call(rpc, method, t.hash, context={"hash": t.hash, "name": t.name, "method": method}) + if not ok: + failed += 1 + results.append({ + "ok": ok, + "hash": t.hash, + "name": t.name, + "result": result, + }) + + print_json({ + "ok": failed == 0, + "action": args.command, + "method": method, + "count": len(torrents), + "failed": failed, + "results": results, + }) + return 0 if failed == 0 else 2 + + +def cmd_bulk_start_complete(rpc, args) -> int: + args.only_complete = True + args.command = "bulk-start" + return cmd_bulk_action(rpc, args) + + +def cmd_dump_methods(rpc, args) -> int: + ok, result = safe_call(rpc, "system.listMethods") + print_json({"ok": ok, "methods": result if ok else None, "error": None if ok else result}) + return 0 if ok else 1 + + +# ---------------------------- +# CLI +# ---------------------------- + +def add_filters(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--view", default="main", help="Widok rTorrent, domyślnie: main") + parser.add_argument("--only-stopped", action="store_true", help="Tylko state=0") + parser.add_argument("--only-started", action="store_true", help="Tylko state=1") + parser.add_argument("--only-active", action="store_true", help="Tylko is_active=1") + parser.add_argument("--only-complete", action="store_true", help="Tylko complete=1") + parser.add_argument("--only-incomplete", action="store_true", help="Tylko complete=0") + parser.add_argument("--name-regex", help="Filtr regex po nazwie") + parser.add_argument("--hash-regex", help="Filtr regex po hashu") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="CLI do diagnozy i masowego sterowania rTorrent przez XML-RPC/SCGI." + ) + parser.add_argument( + "--url", + default=DEFAULT_URL, + help=f"Endpoint RPC, domyślnie: {DEFAULT_URL}", + ) + parser.add_argument("--timeout", type=int, default=15, help="Timeout połączenia w sekundach") + + sub = parser.add_subparsers(dest="command", required=True) + + sub.add_parser("ping", help="Test połączenia") + + p = sub.add_parser("list", help="Lista torrentów") + add_filters(p) + + p = sub.add_parser("show", help="Szczegóły torrenta") + p.add_argument("hash") + + for name in ACTION_METHODS: + p = sub.add_parser(name, help=f"Wykonaj {name} na jednym torrencie") + p.add_argument("hash") + + for name in BULK_ACTION_METHODS: + p = sub.add_parser(name, help=f"Masowo wykonaj {name}") + add_filters(p) + p.add_argument("--dry-run", action="store_true", help="Pokaż cele bez wykonywania akcji") + + p = sub.add_parser("bulk-start-complete", help="Wystartuj kompletne torrenty") + add_filters(p) + p.add_argument("--dry-run", action="store_true", help="Pokaż cele bez wykonywania akcji") + + sub.add_parser("dump-methods", help="Wypisz dostępne metody RPC") + + return parser + + +def main() -> int: + args = build_parser().parse_args() + + try: + rpc = make_rpc_client(args.url, args.timeout) + except Exception as exc: + print_json(rpc_error(exc, {"url": args.url})) + return 1 + + if args.command == "ping": + return cmd_ping(rpc, args) + + if args.command == "list": + return cmd_list(rpc, args) + + if args.command == "show": + return cmd_show(rpc, args) + + if args.command in ACTION_METHODS: + return cmd_single_action(rpc, args) + + if args.command in BULK_ACTION_METHODS: + return cmd_bulk_action(rpc, args) + + if args.command == "bulk-start-complete": + return cmd_bulk_start_complete(rpc, args) + + if args.command == "dump-methods": + return cmd_dump_methods(rpc, args) + + print_json({"ok": False, "error": f"Nieznana komenda: {args.command}"}) + return 1 + + +if __name__ == "__main__": + sys.exit(main())