Add rtorrent_cli.py

This commit is contained in:
gru
2026-05-15 10:31:32 +02:00
parent e602319921
commit d925f207e1

489
rtorrent_cli.py Normal file
View File

@@ -0,0 +1,489 @@
#!/usr/bin/env python3
"""
rtorrent_cli.py - proste CLI do masowego zarządzania rTorrent przez XML-RPC/SCGI.
Domyślny endpoint:
scgi://127.0.0.1:5000
Przykłady:
python3 rtorrent_cli.py ping
python3 rtorrent_cli.py list
python3 rtorrent_cli.py list --only-stopped --only-complete
python3 rtorrent_cli.py show HASH
python3 rtorrent_cli.py start HASH
python3 rtorrent_cli.py bulk-start --only-stopped --only-complete
python3 rtorrent_cli.py bulk-stop --name-regex "ubuntu|debian"
python3 rtorrent_cli.py bulk-announce --only-active
python3 rtorrent_cli.py bulk-check-hash --only-stopped --name-regex "movie"
python3 rtorrent_cli.py dump-methods
"""
from __future__ import annotations
import argparse
import json
import re
import socket
import sys
import xmlrpc.client
from dataclasses import dataclass, asdict
from typing import Any, Iterable
from urllib.parse import urlparse
DEFAULT_URL = "scgi://127.0.0.1:5000"
# ----------------------------
# SCGI XML-RPC transport
# ----------------------------
class SCGITransport(xmlrpc.client.Transport):
def __init__(self, host: str, port: int, timeout: int = 15):
super().__init__()
self.host = host
self.port = port
self.timeout = timeout
def request(self, host: str, handler: str, request_body: bytes, verbose: bool = False):
body = request_body.encode("utf-8") if isinstance(request_body, str) else request_body
headers = {
"CONTENT_LENGTH": str(len(body)),
"SCGI": "1",
"REQUEST_METHOD": "POST",
"REQUEST_URI": handler or "/RPC2",
}
header_bytes = b""
for key, value in headers.items():
header_bytes += key.encode("utf-8") + b"\x00" + value.encode("utf-8") + b"\x00"
packet = str(len(header_bytes)).encode("ascii") + b":" + header_bytes + b"," + body
with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock:
sock.sendall(packet)
response = b""
while True:
chunk = sock.recv(65536)
if not chunk:
break
response += chunk
# rTorrent przez SCGI zwykle zwraca same XML body, ale niektóre proxy mogą dodać nagłówki HTTP.
if b"\r\n\r\n" in response:
response = response.split(b"\r\n\r\n", 1)[1]
return self.parse_response_bytes(response)
def parse_response_bytes(self, data: bytes):
p, u = self.getparser()
p.feed(data)
p.close()
return u.close()
def make_rpc_client(url: str, timeout: int):
parsed = urlparse(url)
if parsed.scheme == "scgi":
if not parsed.hostname:
raise ValueError("SCGI URL musi mieć host, np. scgi://127.0.0.1:5000")
transport = SCGITransport(parsed.hostname, parsed.port or 5000, timeout=timeout)
return xmlrpc.client.ServerProxy(
"http://rtorrent/RPC2",
transport=transport,
allow_none=True,
)
return xmlrpc.client.ServerProxy(url, allow_none=True)
# ----------------------------
# Helpers
# ----------------------------
@dataclass
class Torrent:
hash: str
name: str
state: int
active: int
complete: int
size_bytes: int
completed_bytes: int
ratio: int
down_rate: int
up_rate: int
message: str
@property
def stopped(self) -> bool:
return self.state == 0
@property
def started_or_paused(self) -> bool:
return self.state == 1
@property
def is_active(self) -> bool:
return self.active == 1
@property
def is_complete(self) -> bool:
return self.complete == 1
def rpc_error(exc: Exception, context: dict[str, Any] | None = None) -> dict[str, Any]:
payload = {
"ok": False,
"error_type": exc.__class__.__name__,
"error": str(exc),
}
if context:
payload["context"] = context
return payload
def print_json(data: Any) -> None:
print(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=False))
def call_method(rpc, method: str, *args):
return getattr(rpc, method)(*args)
def safe_call(rpc, method: str, *args, context: dict[str, Any] | None = None):
try:
return True, call_method(rpc, method, *args)
except Exception as exc:
return False, rpc_error(exc, context=context or {"method": method, "args": args})
def human_bytes(num: int) -> str:
value = float(num)
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]:
if abs(value) < 1024:
return f"{value:.1f} {unit}"
value /= 1024
return f"{value:.1f} EiB"
# ----------------------------
# rTorrent API
# ----------------------------
def get_torrent_hashes(rpc, view: str = "main") -> list[str]:
ok, result = safe_call(rpc, "d.multicall2", "", view, "d.hash=")
if not ok:
raise RuntimeError(json.dumps(result, ensure_ascii=False))
hashes: list[str] = []
for row in result:
if isinstance(row, list) and row:
hashes.append(str(row[0]))
elif isinstance(row, str):
hashes.append(row)
return hashes
def list_torrents(rpc, view: str = "main") -> list[Torrent]:
methods = [
"d.hash=",
"d.name=",
"d.state=",
"d.is_active=",
"d.complete=",
"d.size_bytes=",
"d.completed_bytes=",
"d.ratio=",
"d.down.rate=",
"d.up.rate=",
"d.message=",
]
ok, result = safe_call(rpc, "d.multicall2", "", view, *methods)
if not ok:
raise RuntimeError(json.dumps(result, ensure_ascii=False))
torrents: list[Torrent] = []
for row in result:
torrents.append(Torrent(
hash=str(row[0]),
name=str(row[1]),
state=int(row[2]),
active=int(row[3]),
complete=int(row[4]),
size_bytes=int(row[5]),
completed_bytes=int(row[6]),
ratio=int(row[7]),
down_rate=int(row[8]),
up_rate=int(row[9]),
message=str(row[10]),
))
return torrents
def get_torrent(rpc, hash_: str) -> Torrent:
torrents = list_torrents(rpc)
for torrent in torrents:
if torrent.hash.lower() == hash_.lower():
return torrent
raise KeyError(f"Nie znaleziono torrenta: {hash_}")
def filter_torrents(torrents: Iterable[Torrent], args) -> list[Torrent]:
result = list(torrents)
if getattr(args, "only_stopped", False):
result = [t for t in result if t.stopped]
if getattr(args, "only_started", False):
result = [t for t in result if t.started_or_paused]
if getattr(args, "only_active", False):
result = [t for t in result if t.is_active]
if getattr(args, "only_complete", False):
result = [t for t in result if t.is_complete]
if getattr(args, "only_incomplete", False):
result = [t for t in result if not t.is_complete]
if getattr(args, "name_regex", None):
pattern = re.compile(args.name_regex, re.IGNORECASE)
result = [t for t in result if pattern.search(t.name)]
if getattr(args, "hash_regex", None):
pattern = re.compile(args.hash_regex, re.IGNORECASE)
result = [t for t in result if pattern.search(t.hash)]
return result
def torrent_to_dict(t: Torrent) -> dict[str, Any]:
data = asdict(t)
data["size"] = human_bytes(t.size_bytes)
data["completed"] = human_bytes(t.completed_bytes)
data["ratio_float"] = round(t.ratio / 1000, 3)
return data
# ----------------------------
# Commands
# ----------------------------
def cmd_ping(rpc, args) -> int:
ok, result = safe_call(rpc, "system.client_version")
if ok:
print_json({"ok": True, "client_version": result})
return 0
# fallback dla starszych buildów
ok, result = safe_call(rpc, "system.listMethods")
print_json({"ok": ok, "result": result})
return 0 if ok else 1
def cmd_list(rpc, args) -> int:
try:
torrents = filter_torrents(list_torrents(rpc, args.view), args)
print_json({
"ok": True,
"count": len(torrents),
"torrents": [torrent_to_dict(t) for t in torrents],
})
return 0
except Exception as exc:
print_json(rpc_error(exc))
return 1
def cmd_show(rpc, args) -> int:
try:
t = get_torrent(rpc, args.hash)
print_json({"ok": True, "torrent": torrent_to_dict(t)})
return 0
except Exception as exc:
print_json(rpc_error(exc, {"hash": args.hash}))
return 1
ACTION_METHODS = {
"start": "d.start",
"stop": "d.stop",
"open": "d.open",
"close": "d.close",
"check-hash": "d.check_hash",
"announce": "d.tracker_announce",
}
def cmd_single_action(rpc, args) -> int:
method = ACTION_METHODS[args.command]
ok, result = safe_call(rpc, method, args.hash, context={"hash": args.hash, "method": method})
print_json({
"ok": ok,
"action": args.command,
"hash": args.hash,
"result": result,
})
return 0 if ok else 1
BULK_ACTION_METHODS = {
"bulk-start": "d.start",
"bulk-stop": "d.stop",
"bulk-open": "d.open",
"bulk-close": "d.close",
"bulk-check-hash": "d.check_hash",
"bulk-announce": "d.tracker_announce",
}
def cmd_bulk_action(rpc, args) -> int:
try:
torrents = filter_torrents(list_torrents(rpc, args.view), args)
except Exception as exc:
print_json(rpc_error(exc))
return 1
method = BULK_ACTION_METHODS[args.command]
results = []
if args.dry_run:
print_json({
"ok": True,
"dry_run": True,
"action": args.command,
"method": method,
"count": len(torrents),
"targets": [torrent_to_dict(t) for t in torrents],
})
return 0
failed = 0
for t in torrents:
ok, result = safe_call(rpc, method, t.hash, context={"hash": t.hash, "name": t.name, "method": method})
if not ok:
failed += 1
results.append({
"ok": ok,
"hash": t.hash,
"name": t.name,
"result": result,
})
print_json({
"ok": failed == 0,
"action": args.command,
"method": method,
"count": len(torrents),
"failed": failed,
"results": results,
})
return 0 if failed == 0 else 2
def cmd_bulk_start_complete(rpc, args) -> int:
args.only_complete = True
args.command = "bulk-start"
return cmd_bulk_action(rpc, args)
def cmd_dump_methods(rpc, args) -> int:
ok, result = safe_call(rpc, "system.listMethods")
print_json({"ok": ok, "methods": result if ok else None, "error": None if ok else result})
return 0 if ok else 1
# ----------------------------
# CLI
# ----------------------------
def add_filters(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--view", default="main", help="Widok rTorrent, domyślnie: main")
parser.add_argument("--only-stopped", action="store_true", help="Tylko state=0")
parser.add_argument("--only-started", action="store_true", help="Tylko state=1")
parser.add_argument("--only-active", action="store_true", help="Tylko is_active=1")
parser.add_argument("--only-complete", action="store_true", help="Tylko complete=1")
parser.add_argument("--only-incomplete", action="store_true", help="Tylko complete=0")
parser.add_argument("--name-regex", help="Filtr regex po nazwie")
parser.add_argument("--hash-regex", help="Filtr regex po hashu")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="CLI do diagnozy i masowego sterowania rTorrent przez XML-RPC/SCGI."
)
parser.add_argument(
"--url",
default=DEFAULT_URL,
help=f"Endpoint RPC, domyślnie: {DEFAULT_URL}",
)
parser.add_argument("--timeout", type=int, default=15, help="Timeout połączenia w sekundach")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("ping", help="Test połączenia")
p = sub.add_parser("list", help="Lista torrentów")
add_filters(p)
p = sub.add_parser("show", help="Szczegóły torrenta")
p.add_argument("hash")
for name in ACTION_METHODS:
p = sub.add_parser(name, help=f"Wykonaj {name} na jednym torrencie")
p.add_argument("hash")
for name in BULK_ACTION_METHODS:
p = sub.add_parser(name, help=f"Masowo wykonaj {name}")
add_filters(p)
p.add_argument("--dry-run", action="store_true", help="Pokaż cele bez wykonywania akcji")
p = sub.add_parser("bulk-start-complete", help="Wystartuj kompletne torrenty")
add_filters(p)
p.add_argument("--dry-run", action="store_true", help="Pokaż cele bez wykonywania akcji")
sub.add_parser("dump-methods", help="Wypisz dostępne metody RPC")
return parser
def main() -> int:
args = build_parser().parse_args()
try:
rpc = make_rpc_client(args.url, args.timeout)
except Exception as exc:
print_json(rpc_error(exc, {"url": args.url}))
return 1
if args.command == "ping":
return cmd_ping(rpc, args)
if args.command == "list":
return cmd_list(rpc, args)
if args.command == "show":
return cmd_show(rpc, args)
if args.command in ACTION_METHODS:
return cmd_single_action(rpc, args)
if args.command in BULK_ACTION_METHODS:
return cmd_bulk_action(rpc, args)
if args.command == "bulk-start-complete":
return cmd_bulk_start_complete(rpc, args)
if args.command == "dump-methods":
return cmd_dump_methods(rpc, args)
print_json({"ok": False, "error": f"Nieznana komenda: {args.command}"})
return 1
if __name__ == "__main__":
sys.exit(main())