diff --git a/pytorrent/migrations.py b/pytorrent/migrations.py index ac5dd83..2fc0e33 100644 --- a/pytorrent/migrations.py +++ b/pytorrent/migrations.py @@ -6,11 +6,8 @@ MIGRATIONS: tuple[str, ...] = () def run_database_migrations(conn: sqlite3.Connection) -> int: - """Run pending database migrations. - - Note: no migrations are currently required because supported databases are - already expected to use the current schema version. - """ + """Run pending database migrations.""" + applied = 0 for sql in MIGRATIONS: conn.execute(sql) diff --git a/pytorrent/services/rtorrent_original_TO_DELETE b/pytorrent/services/rtorrent_original_TO_DELETE deleted file mode 100644 index 7d9ca71..0000000 --- a/pytorrent/services/rtorrent_original_TO_DELETE +++ /dev/null @@ -1,1993 +0,0 @@ -from __future__ import annotations - -import errno -import os -import posixpath -import socket -import time -import uuid -from urllib.parse import urlparse -from xmlrpc.client import Binary, dumps, loads -from pathlib import Path as LocalPath -from ..utils import human_rate, human_size -from ..db import connect, default_user_id, utcnow -from ..config import PYTORRENT_TMP_DIR, REMOTE_READ_CHUNK_BYTES - - -class ScgiMethod: - def __init__(self, client: "ScgiRtorrentClient", name: str): - self.client = client - self.name = name - - def __getattr__(self, name: str): - return ScgiMethod(self.client, f"{self.name}.{name}") - - def __call__(self, *args): - return self.client.call(self.name, *args) - - -class ScgiRtorrentClient: - """XML-RPC over SCGI client for rTorrent network.scgi.open_port.""" - - def __init__(self, url: str, timeout: int = 5): - parsed = urlparse(url) - if parsed.scheme != "scgi": - raise ValueError("SCGI URL must start with scgi://") - if not parsed.hostname or not parsed.port: - raise ValueError("SCGI URL must include host and port, e.g. scgi://127.0.0.1:5000/RPC2") - self.host = parsed.hostname - self.port = parsed.port - self.timeout = timeout - self.path = parsed.path or "/RPC2" - - def __getattr__(self, name: str): - return ScgiMethod(self, name) - - def call(self, method_name: str, *args): - body = dumps(args, methodname=method_name, allow_none=True).encode("utf-8") - headers = { - "CONTENT_LENGTH": str(len(body)), - "SCGI": "1", - "REQUEST_METHOD": "POST", - "REQUEST_URI": self.path, - "SCRIPT_NAME": self.path, - "SERVER_PROTOCOL": "HTTP/1.1", - "CONTENT_TYPE": "text/xml", - } - header_blob = b"".join(k.encode() + b"\0" + v.encode() + b"\0" for k, v in headers.items()) - payload = str(len(header_blob)).encode("ascii") + b":" + header_blob + b"," + body - attempts = _scgi_retry_attempts() - last_exc = None - for attempt in range(1, attempts + 1): - try: - with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock: - sock.settimeout(self.timeout) - sock.sendall(payload) - chunks: list[bytes] = [] - while True: - chunk = sock.recv(65536) - if not chunk: - break - chunks.append(chunk) - response = b"".join(chunks) - if not response: - raise ConnectionError("Empty response from rTorrent SCGI") - if b"\r\n\r\n" in response: - response = response.split(b"\r\n\r\n", 1)[1] - elif b"\n\n" in response: - response = response.split(b"\n\n", 1)[1] - result, _ = loads(response) - return result[0] if len(result) == 1 else result - except Exception as exc: - last_exc = exc - if attempt >= attempts or not _is_transient_scgi_error(exc): - raise - time.sleep(_scgi_retry_delay(attempt)) - raise last_exc or ConnectionError("rTorrent SCGI call failed") - - -def _scgi_retry_attempts() -> int: - # Note: Short retry/backoff protects bulk operations from temporary Errno 111 during high rTorrent load. - try: - return max(1, min(10, int(os.environ.get("PYTORRENT_SCGI_RETRIES", "5")))) - except Exception: - return 5 - - -def _scgi_retry_delay(attempt: int) -> float: - return min(5.0, 0.35 * (2 ** max(0, attempt - 1))) - - -def _is_transient_scgi_error(exc: Exception) -> bool: - # Note: Retry covers common temporary SCGI/socket errors but does not hide semantic XML-RPC errors. - if isinstance(exc, (ConnectionRefusedError, ConnectionResetError, TimeoutError, socket.timeout)): - return True - err_no = getattr(exc, "errno", None) - if err_no in {errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT, errno.EHOSTUNREACH, errno.ENETUNREACH}: - return True - msg = str(exc).lower() - return any(text in msg for text in ("connection refused", "connection reset", "timed out", "timeout", "empty response", "pipe creation failed", "resource temporarily unavailable", "try again", "temporarily unavailable")) - - -def client_for(profile: dict) -> ScgiRtorrentClient: - return ScgiRtorrentClient(profile["scgi_url"], int(profile.get("timeout_seconds") or 5)) - - -_UNSUPPORTED_EXEC_METHODS: set[str] = set() -_EXEC_TARGET_STYLE: dict[str, int] = {} - -def _rt_execute_preview(method_name: str, call_args: tuple) -> str: - # Note: The compact RPC summary removes long scripts from error messages while keeping the method and first arguments for diagnostics. - preview = ", ".join(repr(x) for x in call_args[:3]) - if len(call_args) > 3: - preview += ", ..." - return f"{method_name}({preview})" - - -def _rt_execute_target_variants(method: str, args: tuple) -> list[tuple]: - # Note: Depending on version, rTorrent XML-RPC either requires or rejects an empty target; cache the working variant per method. - variants = [("", *args), args] - preferred = _EXEC_TARGET_STYLE.get(method) - if preferred is not None and 0 <= preferred < len(variants): - return [variants[preferred]] + [v for i, v in enumerate(variants) if i != preferred] - return variants - - -def _is_rt_method_missing(exc: Exception) -> bool: - msg = str(exc).lower() - return "not defined" in msg or "no such method" in msg or "unknown method" in msg - - -def _rt_execute_methods(method: str) -> list[str]: - # Note: execute2.* is tried only when the base execute.* method does not exist to avoid false retry errors. - methods = [method] - if method.startswith("execute."): - fallback = method.replace("execute.", "execute2.", 1) - if fallback not in _UNSUPPORTED_EXEC_METHODS: - methods.append(fallback) - return methods - - -def _rt_execute(c: ScgiRtorrentClient, method: str, *args): - """Run rTorrent execute.* as the rTorrent user across XML-RPC variants.""" - errors: list[str] = [] - attempts = _scgi_retry_attempts() - for attempt in range(1, attempts + 1): - errors.clear() - transient_seen = False - primary_missing = False - for method_index, method_name in enumerate(_rt_execute_methods(method)): - if method_name in _UNSUPPORTED_EXEC_METHODS: - continue - if method_index > 0 and not primary_missing: - continue - for call_args in _rt_execute_target_variants(method_name, args): - try: - result = c.call(method_name, *call_args) - if method_name == method: - _EXEC_TARGET_STYLE[method_name] = 0 if call_args and call_args[0] == "" else 1 - return result - except Exception as exc: - if _is_rt_method_missing(exc): - _UNSUPPORTED_EXEC_METHODS.add(method_name) - if method_name == method: - primary_missing = True - errors.append(f"{method_name}: method not defined") - break - transient_seen = transient_seen or _is_transient_scgi_error(exc) - errors.append(f"{_rt_execute_preview(method_name, call_args)}: {exc}") - if transient_seen and attempt < attempts: - time.sleep(_scgi_retry_delay(attempt)) - continue - break - raise RuntimeError("rTorrent execute failed: " + "; ".join(errors)) - - -def _is_rt_timeout_error(exc: Exception) -> bool: - msg = str(exc).lower() - return isinstance(exc, (TimeoutError, socket.timeout)) or "timed out" in msg or "timeout" in msg - - -def _rt_execute_allow_timeout(c: ScgiRtorrentClient, method: str, *args): - try: - return _rt_execute(c, method, *args) - except Exception as exc: - if _is_rt_timeout_error(exc): - return None - raise - - -def _remote_clean_path(path: str) -> str: - path = str(path or "").strip() - return posixpath.normpath(path) if path else path - - -def _remote_join(*parts: str) -> str: - cleaned = [str(p).strip().rstrip("/") for p in parts if str(p).strip()] - return posixpath.normpath(posixpath.join(*cleaned)) if cleaned else "" - - -def _run_remote_move(c: ScgiRtorrentClient, src: str, dst: str, poll_interval: float = 2.0) -> None: - """Run a remote mv without binding the transfer time to the SCGI timeout.""" - token = uuid.uuid4().hex - status_path = f"/tmp/pytorrent-move-{token}.status" - start_script = ( - 'src=$1; dst=$2; status=$3; tmp=${status}.tmp; ' - 'rm -f "$status" "$tmp"; ' - '( ' - 'rc=0; ' - 'parent=${dst%/*}; ' - 'if [ -z "$dst" ] || [ "$dst" = "/" ]; then echo "unsafe destination: $dst" >&2; rc=5; fi; ' - 'if [ $rc -eq 0 ] && [ -n "$parent" ] && [ "$parent" != "$dst" ]; then mkdir -p "$parent" || rc=$?; fi; ' - 'if [ $rc -eq 0 ] && [ "$src" = "$dst" ]; then :; ' - 'elif [ $rc -eq 0 ] && { [ -e "$dst" ] || [ -L "$dst" ]; } && [ ! -e "$src" ] && [ ! -L "$src" ]; then :; ' - 'elif [ $rc -eq 0 ] && [ ! -e "$src" ] && [ ! -L "$src" ]; then echo "source missing: $src" >&2; rc=3; ' - 'elif [ $rc -eq 0 ] && { [ -e "$dst" ] || [ -L "$dst" ]; }; then rm -rf -- "$dst" && mv -f -- "$src" "$dst" || rc=$?; ' - 'elif [ $rc -eq 0 ]; then mv -f -- "$src" "$dst" || rc=$?; ' - 'fi; ' - 'if [ $rc -eq 0 ]; then printf "OK\n" > "$status"; ' - 'else printf "ERR %s\n" "$rc" > "$status"; fi; ' - 'if [ -s "$tmp" ]; then cat "$tmp" >> "$status"; fi; ' - 'rm -f "$tmp" ' - ') > "$tmp" 2>&1 &' - ) - poll_script = 'status=$1; [ -f "$status" ] && cat "$status" || true' - cleanup_script = 'rm -f "$1"' - - _rt_execute_allow_timeout(c, "execute.throw", "sh", "-c", start_script, "pytorrent-move-start", src, dst, status_path) - - while True: - time.sleep(max(0.25, poll_interval)) - try: - output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-move-poll", status_path) or "").strip() - except Exception as exc: - # Note: During bulk moves, rTorrent may briefly not create the execute.capture pipe; polling waits and retries. - if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc): - continue - raise - if not output: - continue - try: - _rt_execute(c, "execute.throw", "sh", "-c", cleanup_script, "pytorrent-move-clean", status_path) - except Exception: - pass - first_line = output.splitlines()[0].strip() - if first_line == "OK": - return - if first_line.startswith("ERR"): - details = "\n".join(output.splitlines()[1:]).strip() - raise RuntimeError(details or first_line) - raise RuntimeError(output) - - -def _torrent_data_path(c: ScgiRtorrentClient, torrent_hash: str) -> str: - """Return data path as rTorrent sees it; do not touch pyTorrent local FS.""" - try: - src = str(c.call("d.base_path", torrent_hash) or "").strip() - if src: - return src - except Exception: - pass - directory = str(c.call("d.directory", torrent_hash) or "").strip() - name = str(c.call("d.name", torrent_hash) or "").strip() - try: - is_multi = int(c.call("d.is_multi_file", torrent_hash) or 0) - except Exception: - is_multi = 0 - if is_multi: - return directory - if directory and name: - return _remote_join(directory, name) - return directory - - -def _safe_rm_rf_path(path: str) -> str: - path = _remote_clean_path(path) - if not path or path in {"/", "."}: - raise ValueError("Refusing to remove an unsafe data path") - if path.rstrip("/").count("/") < 1: - raise ValueError(f"Refusing to remove an unsafe data path: {path}") - return path - - -def _run_remote_rm(c: ScgiRtorrentClient, path: str, poll_interval: float = 2.0) -> None: - # Note: rm -rf runs in the background on the rTorrent side, so long deletes do not hold a single SCGI connection. - token = uuid.uuid4().hex - status_path = f"/tmp/pytorrent-rm-{token}.status" - script = ( - 'target=$1; status=$2; tmp=${status}.tmp; ' - 'rm -f "$status" "$tmp"; ' - '( rc=0; ' - 'if [ -z "$target" ] || [ "$target" = "/" ] || [ "$target" = "." ]; then echo "unsafe remove target: $target" >&2; rc=5; ' - 'else rm -rf -- "$target" || rc=$?; fi; ' - 'if [ $rc -eq 0 ]; then printf "OK\n" > "$status"; else printf "ERR %s\n" "$rc" > "$status"; fi; ' - 'if [ -s "$tmp" ]; then cat "$tmp" >> "$status"; fi; ' - 'rm -f "$tmp" ) > "$tmp" 2>&1 &' - ) - poll_script = 'status=$1; [ -f "$status" ] && cat "$status" || true' - cleanup_script = 'rm -f "$1"' - _rt_execute_allow_timeout(c, "execute.throw", "sh", "-c", script, "pytorrent-rm-start", path, status_path) - while True: - time.sleep(max(0.25, poll_interval)) - try: - output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-rm-poll", status_path) or "").strip() - except Exception as exc: - # Note: Remove uses the same safe polling as move, so a temporary missing pipe does not fail the whole queue. - if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc): - continue - raise - if not output: - continue - try: - _rt_execute(c, "execute.throw", "sh", "-c", cleanup_script, "pytorrent-rm-clean", status_path) - except Exception: - pass - first_line = output.splitlines()[0].strip() - if first_line == "OK": - return - if first_line.startswith("ERR"): - details = "\n".join(output.splitlines()[1:]).strip() - raise RuntimeError(details or first_line) - raise RuntimeError(output) - - -def _remove_torrent_data(c: ScgiRtorrentClient, torrent_hash: str) -> dict: - data_path = _safe_rm_rf_path(_torrent_data_path(c, torrent_hash)) - try: - c.call("d.stop", torrent_hash) - except Exception: - pass - try: - c.call("d.close", torrent_hash) - except Exception: - pass - _run_remote_rm(c, data_path) - return {"hash": torrent_hash, "removed_path": data_path} - - -def browse_path(profile: dict, path: str | None = None) -> dict: - """List directories through rTorrent execute.capture to avoid pyTorrent FS permissions.""" - c = client_for(profile) - base = _remote_clean_path(path or default_download_path(profile)) - script = ( - 'base=$1; ' - '[ -d "$base" ] || exit 2; ' - 'for p in "$base"/* "$base"/.[!.]* "$base"/..?*; do ' - '[ -d "$p" ] || continue; ' - 'name=${p##*/}; ' - 'printf "%s\t%s\n" "$name" "$p"; ' - 'done' - ) - output = _rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-browse", base) - dirs = [] - for line in str(output or "").splitlines(): - if "\t" not in line: - continue - name, full_path = line.split("\t", 1) - if name not in {".", ".."}: - dirs.append({"name": name, "path": full_path}) - dirs.sort(key=lambda x: x["name"].lower()) - parent = posixpath.dirname(base.rstrip("/")) or "/" - if parent == base: - parent = base - return {"path": base, "parent": parent, "dirs": dirs[:300], "source": "rtorrent"} - - -POST_CHECK_DOWNLOAD_LABEL = "To download after check" -_POST_CHECK_WATCH_TTL_SECONDS = 48 * 60 * 60 -_POST_CHECK_WATCH_MIN_SECONDS = 2.0 -_POST_CHECK_WATCH: dict[int, dict[str, float]] = {} - - -def _mark_post_check_watch(profile_id: int, torrent_hash: str) -> None: - if not torrent_hash: - return - _POST_CHECK_WATCH.setdefault(int(profile_id), {})[str(torrent_hash)] = time.time() - - -def _clear_post_check_watch(profile_id: int, torrent_hash: str) -> None: - profile_watch = _POST_CHECK_WATCH.get(int(profile_id)) - if not profile_watch: - return - profile_watch.pop(str(torrent_hash), None) - if not profile_watch: - _POST_CHECK_WATCH.pop(int(profile_id), None) - - -def _is_post_check_watched(profile_id: int, torrent_hash: str) -> bool: - profile_watch = _POST_CHECK_WATCH.get(int(profile_id)) or {} - started_at = profile_watch.get(str(torrent_hash)) - if not started_at: - return False - age = time.time() - started_at - if age > _POST_CHECK_WATCH_TTL_SECONDS: - _clear_post_check_watch(profile_id, torrent_hash) - return False - # Note: A short grace period prevents labeling a recheck that was queued but has not visibly entered hashing yet. - return age >= _POST_CHECK_WATCH_MIN_SECONDS - - -def _label_names(value: str) -> list[str]: - names: list[str] = [] - for part in str(value or "").replace(";", ",").replace("|", ",").split(","): - label = part.strip() - if label and label not in names: - names.append(label) - return names - - -def _label_value(labels: list[str]) -> str: - return ", ".join([label for label in labels if str(label or "").strip()]) - - -def _without_post_check_download_label(value: str | None) -> str: - return _label_value([label for label in _label_names(str(value or "")) if label != POST_CHECK_DOWNLOAD_LABEL]) - - -def clear_post_check_download_label(c: ScgiRtorrentClient, torrent_hash: str, current_label: str | None = None) -> bool: - label_source = current_label - if label_source is None: - try: - label_source = str(c.call("d.custom1", str(torrent_hash or "")) or "") - except Exception: - label_source = "" - labels = _label_names(str(label_source or "")) - if POST_CHECK_DOWNLOAD_LABEL not in labels: - return False - # Note: The temporary post-check label is removed only after the torrent leaves the stopped waiting queue. - c.call("d.custom1.set", str(torrent_hash or ""), _label_value([label for label in labels if label != POST_CHECK_DOWNLOAD_LABEL])) - return True - - -def _message_indicates_active_check(message: str) -> bool: - msg = str(message or "").lower() - if not msg: - return False - finished_markers = ("complete", "completed", "finished", "success", "succeeded", "failed", "done") - if any(marker in msg for marker in finished_markers): - return False - active_markers = ("checking", "hashing", "hash check queued", "hash check scheduled", "check hash queued", "recheck queued", "rechecking") - return any(marker in msg for marker in active_markers) - - -def _row_progress_complete(row: dict) -> bool: - size = int(row.get("size") or 0) - completed = int(row.get("completed_bytes") or 0) - return bool(row.get("complete")) or (size > 0 and completed >= size) or float(row.get("progress") or 0) >= 100.0 - - -def _cleanup_post_check_label_if_ready(c: ScgiRtorrentClient, row: dict) -> bool: - labels = _label_names(str(row.get("label") or "")) - if POST_CHECK_DOWNLOAD_LABEL not in labels: - return False - status = str(row.get("status") or "").lower() - started_after_wait = bool(int(row.get("state") or 0)) and status != "checking" - if not (_row_progress_complete(row) or status == "seeding" or started_after_wait): - return False - # Note: Keep the post-check label while the torrent is stopped; remove it once it is started for download/seeding. - clear_post_check_download_label(c, str(row.get("hash") or ""), str(row.get("label") or "")) - row["label"] = _without_post_check_download_label(str(row.get("label") or "")) - return True - - -def apply_post_check_policy(profile: dict, rows: list[dict], previous_rows: dict[str, dict] | None = None) -> list[dict]: - """Start complete torrents after check; stop and label incomplete ones for Smart Queue.""" - previous_rows = previous_rows or {} - profile_id = int(profile.get("id") or 0) - c = client_for(profile) - changes: list[dict] = [] - for row in rows: - h = str(row.get("hash") or "") - prev = previous_rows.get(h) or {} - try: - if h and _cleanup_post_check_label_if_ready(c, row): - changes.append({"hash": h, "action": "remove_post_check_label"}) - except Exception as exc: - changes.append({"hash": h, "action": "remove_post_check_label_failed", "error": str(exc)}) - was_checking = str(prev.get("status") or "") == "Checking" or int(prev.get("hashing") or 0) > 0 - watched_recheck = _is_post_check_watched(profile_id, h) - is_checking = str(row.get("status") or "") == "Checking" or int(row.get("hashing") or 0) > 0 - if not h or not (was_checking or watched_recheck) or is_checking: - continue - complete = _row_progress_complete(row) - try: - if complete: - # Note: A fully checked torrent is started with the same helper as the manual Start action so it seeds immediately. - start_result = start_or_resume_hash(c, h) - clear_post_check_download_label(c, h, str(row.get("label") or "")) - row.update({"state": 1, "active": 1, "paused": False, "status": "Seeding", "label": _without_post_check_download_label(str(row.get("label") or ""))}) - changes.append({"hash": h, "action": "start_seed_after_check", "complete": True, "result": start_result}) - else: - labels = _label_names(str(row.get("label") or "")) - if POST_CHECK_DOWNLOAD_LABEL not in labels: - labels.append(POST_CHECK_DOWNLOAD_LABEL) - label_value = _label_value(labels) - # Note: Incomplete torrents are left stopped after check so Smart Queue can start them later within the global limit. - c.call("d.stop", h) - try: - c.call("d.close", h) - except Exception: - pass - c.call("d.custom1.set", h, label_value) - row.update({"state": 0, "active": 0, "paused": False, "status": "Stopped", "label": label_value}) - changes.append({"hash": h, "action": "stop_and_label_after_check", "complete": False, "label": POST_CHECK_DOWNLOAD_LABEL}) - _clear_post_check_watch(profile_id, h) - except Exception as exc: - changes.append({"hash": h, "action": "post_check_policy_failed", "error": str(exc)}) - return changes - - -TORRENT_FIELDS = [ - "d.hash=", "d.name=", "d.state=", "d.complete=", "d.size_bytes=", "d.completed_bytes=", - "d.ratio=", "d.up.rate=", "d.down.rate=", "d.up.total=", "d.down.total=", "d.peers_connected=", - "d.peers_complete=", "d.priority=", "d.directory=", "d.base_path=", "d.creation_date=", "d.custom1=", - "d.custom=py_ratio_group", "d.message=", "d.hashing=", "d.is_active=", "d.is_multi_file=", -] - -TORRENT_OPTIONAL_FIELDS = [ - "d.timestamp.finished=", -] - - -def human_duration(seconds: int) -> str: - # Note: Download ETA is derived locally from remaining bytes and current download speed. - seconds = max(0, int(seconds or 0)) - if seconds <= 0: - return '-' - days, rem = divmod(seconds, 86400) - hours, rem = divmod(rem, 3600) - minutes, _ = divmod(rem, 60) - if days: - return f"{days}d {hours}h" - if hours: - return f"{hours}h {minutes}m" - return f"{minutes}m" - - -def normalize_row(row: list) -> dict: - size = int(row[4] or 0) - completed = int(row[5] or 0) - progress = 100.0 if size <= 0 and int(row[3] or 0) else round((completed / size) * 100, 2) if size else 0.0 - ratio_raw = int(row[6] or 0) - down_rate = int(row[8] or 0) - up_rate = int(row[7] or 0) - remaining_bytes = max(0, size - completed) - eta_seconds = int(remaining_bytes / down_rate) if down_rate > 0 and not int(row[3] or 0) else 0 - directory = str(row[14] or "") - base_path = str(row[15] or "") - is_multi_file = int(row[22] or 0) if len(row) > 22 else 0 - completed_at = int(row[23] or 0) if len(row) > 23 else 0 - - # Show the selected download location only. Hide the torrent root - # directory for multi-file torrents and the filename for single-file - # torrents. Data deletion still uses the full d.base_path elsewhere. - if base_path and base_path != "/": - display_parent = posixpath.dirname(base_path.rstrip("/")) or "/" - display_path = display_parent.rstrip("/") + "/" if display_parent != "/" else display_parent - elif directory and is_multi_file and directory != "/": - display_parent = posixpath.dirname(directory.rstrip("/")) or "/" - display_path = display_parent.rstrip("/") + "/" if display_parent != "/" else display_parent - elif directory: - display_path = directory.rstrip("/") + "/" if directory != "/" else directory - else: - display_path = "" - msg = str(row[19] or "") - msg_l = msg.lower() - hashing = int(row[20] or 0) if len(row) > 20 else 0 - is_active = int(row[21] or 0) if len(row) > 21 else int(row[2] or 0) - state = int(row[2] or 0) - complete = int(row[3] or 0) - # Note: d.hashing is authoritative; stale "hash check complete" messages must not keep the UI in Checking forever. - is_checking = bool(hashing) or _message_indicates_active_check(msg_l) - is_paused = bool(state) and not bool(is_active) and not is_checking - status = "Checking" if is_checking else "Paused" if is_paused else "Seeding" if complete and state else "Downloading" if state else "Stopped" - return { - "hash": str(row[0] or ""), - "name": str(row[1] or ""), - "state": state, - "active": is_active, - "paused": is_paused, - "complete": complete, - "size": size, - "size_h": human_size(size), - "completed_bytes": completed, - "progress": progress, - "ratio": round(ratio_raw / 1000, 3), - "up_rate": up_rate, - "up_rate_h": human_rate(up_rate), - "down_rate": down_rate, - "down_rate_h": human_rate(down_rate), - "eta_seconds": eta_seconds, - "eta_h": human_duration(eta_seconds) if eta_seconds else "-", - "up_total": int(row[9] or 0), - "up_total_h": human_size(row[9] or 0), - "down_total": int(row[10] or 0), - "down_total_h": human_size(row[10] or 0), - "peers": int(row[11] or 0), - "seeds": int(row[12] or 0), - "priority": int(row[13] or 0), - "path": display_path, - "created": int(row[16] or 0), - "completed_at": completed_at, - "label": str(row[17] or ""), - "ratio_group": str(row[18] or ""), - "message": msg, - "status": status, - "hashing": hashing, - } - - -def list_torrents(profile: dict) -> list[dict]: - c = client_for(profile) - try: - rows = c.d.multicall2("", "main", *(TORRENT_FIELDS + TORRENT_OPTIONAL_FIELDS)) - except Exception: - # Keep compatibility with older rTorrent builds that do not expose optional timestamp fields. - rows = c.d.multicall2("", "main", *TORRENT_FIELDS) - return [normalize_row(list(row)) for row in rows] - - -_DISK_USAGE_CACHE: dict[str, tuple[float, dict]] = {} -_DISK_USAGE_TTL_SECONDS = 30.0 -_REMOTE_USAGE_CACHE: dict[int, tuple[float, dict]] = {} -_REMOTE_USAGE_TTL_SECONDS = 60.0 -_REMOTE_PUBLIC_IP_CACHE: dict[int, tuple[float, str]] = {} -_REMOTE_PUBLIC_IP_TTL_SECONDS = 6 * 60 * 60.0 - - -def remote_public_ip(profile: dict, force: bool = False) -> str: - profile_id = int(profile.get("id") or 0) - now = time.monotonic() - cached = _REMOTE_PUBLIC_IP_CACHE.get(profile_id) - if cached and not force and now - cached[0] < _REMOTE_PUBLIC_IP_TTL_SECONDS: - return cached[1] - script = ( - 'for url in https://ifconfig.co https://ifconfig.me https://ipapi.linuxiarz.pl http://ifconfig.co http://ifconfig.me; do ' - 'ip=$(curl -fsS --max-time 8 "$url" 2>/dev/null | tr -d "\r" | head -n 1 | sed "s/[^0-9a-fA-F:.]//g"); ' - 'if [ -n "$ip" ]; then printf "%s" "$ip"; exit 0; fi; ' - 'done; exit 1' - ) - value = str(_rt_execute(client_for(profile), "execute.capture", "sh", "-c", script) or "").strip() - if not value: - raise RuntimeError("Cannot read remote public IP") - _REMOTE_PUBLIC_IP_CACHE[profile_id] = (now, value) - return value - - -def remote_system_usage(profile: dict, force: bool = False) -> dict: - profile_id = int(profile.get("id") or 0) - now = time.monotonic() - cached = _REMOTE_USAGE_CACHE.get(profile_id) - if cached and not force and now - cached[0] < _REMOTE_USAGE_TTL_SECONDS: - usage = dict(cached[1]) - usage["cached"] = True - return usage - script = ( - 'read cpu user nice system idle iowait irq softirq steal guest guest_nice < /proc/stat; ' - 'total1=$((user+nice+system+idle+iowait+irq+softirq+steal)); idle1=$((idle+iowait)); ' - 'sleep 1; ' - 'read cpu user nice system idle iowait irq softirq steal guest guest_nice < /proc/stat; ' - 'total2=$((user+nice+system+idle+iowait+irq+softirq+steal)); idle2=$((idle+iowait)); ' - 'dt=$((total2-total1)); di=$((idle2-idle1)); ' - 'cpu_pct=$(awk -v dt="$dt" -v di="$di" "BEGIN { if (dt > 0) printf \"%.1f\", (dt-di)*100/dt; else printf \"0.0\" }"); ' - "mem_total=$(awk '/^MemTotal:/ {print $2}' /proc/meminfo); " - "mem_avail=$(awk '/^MemAvailable:/ {print $2}' /proc/meminfo); " - 'ram_pct=$(awk -v t="$mem_total" -v a="$mem_avail" "BEGIN { if (t > 0) printf \"%.1f\", (t-a)*100/t; else printf \"0.0\" }"); ' - 'printf "%s %s" "$cpu_pct" "$ram_pct"' - ) - output = str(_rt_execute(client_for(profile), "execute.capture", "sh", "-c", script) or "").strip() - parts = output.split() - if len(parts) < 2: - raise RuntimeError(f"Cannot read remote CPU/RAM usage: {output}") - usage = {"cpu": float(parts[0]), "ram": float(parts[1]), "source": "rtorrent-remote", "usage_source": "rtorrent-remote", "cached": False} - _REMOTE_USAGE_CACHE[profile_id] = (now, usage) - return dict(usage) - - -def _usage_dict(total: int, used: int, free: int) -> dict: - total = max(0, int(total or 0)) - used = max(0, int(used or 0)) - free = max(0, int(free or 0)) - pct = round((used / total) * 100, 1) if total else 0.0 - return { - "ok": True, - "total": total, - "used": used, - "free": free, - "total_h": human_size(total), - "used_h": human_size(used), - "free_h": human_size(free), - "percent": pct, - } - - -def _statvfs_usage(path: str) -> dict: - stat = os.statvfs(path) - total = int(stat.f_blocks * stat.f_frsize) - free = int(stat.f_bavail * stat.f_frsize) - used = max(0, total - free) - return _usage_dict(total, used, free) - - -def _remote_df_usage(profile: dict, path: str) -> dict: - # Note: Disk paths belong to the rTorrent host. Query df through rTorrent so NFS/Btrfs mounts are measured correctly. - clean_path = _remote_clean_path(path or os.sep) - cache_key = f"remote-df:{profile.get('id')}:{clean_path}" - now = time.monotonic() - cached = _DISK_USAGE_CACHE.get(cache_key) - if cached and now - cached[0] < _DISK_USAGE_TTL_SECONDS: - return dict(cached[1]) - script = ( - 'path=$1; ' - 'if [ ! -e "$path" ]; then echo "ERR\tmissing path"; exit 0; fi; ' - 'line=$(df -Pk "$path" 2>/dev/null | tail -n 1); ' - 'if [ -z "$line" ]; then echo "ERR\tdf failed"; exit 0; fi; ' - 'set -- $line; pct=${5%\\%}; ' - 'if [ -z "$2" ] || [ -z "$3" ] || [ -z "$4" ]; then echo "ERR\tdf parse failed"; exit 0; fi; ' - 'printf "OK\t%s\t%s\t%s\t%s\t%s\n" "$2" "$3" "$4" "$pct" "$6"' - ) - output = str(_rt_execute(client_for(profile), "execute.capture", "sh", "-c", script, "pytorrent-df", clean_path) or "").strip() - first_line = output.splitlines()[0] if output else "" - parts = first_line.split("\t") - if len(parts) >= 6 and parts[0] == "OK": - total = int(parts[1]) * 1024 - used = int(parts[2]) * 1024 - free = int(parts[3]) * 1024 - usage = _usage_dict(total, used, free) - usage.update({"path": clean_path, "source_path": parts[5] or clean_path, "fallback": False, "measure_source": "rtorrent-df"}) - else: - error = parts[1] if len(parts) > 1 else (output or "df returned no data") - usage = {"ok": False, "path": clean_path, "source_path": clean_path, "error": error, "percent": 0, "measure_source": "rtorrent-df"} - _DISK_USAGE_CACHE[cache_key] = (now, dict(usage)) - return usage - - -def _disk_usage_for_path(profile: dict, path: str, allow_parent_fallback: bool = False) -> dict: - clean_path = _remote_clean_path(path or os.sep) - try: - return _remote_df_usage(profile, clean_path) - except Exception as remote_exc: - try: - usage = _statvfs_usage(clean_path) - usage.update({"path": clean_path, "source_path": clean_path, "fallback": False, "measure_source": "local-statvfs", "warning": str(remote_exc)}) - return usage - except Exception as first_exc: - usage = {"ok": False, "path": clean_path, "source_path": clean_path, "error": str(first_exc), "warning": str(remote_exc), "percent": 0} - if not allow_parent_fallback: - return usage - probe = os.path.abspath(clean_path or os.sep) - seen = set() - while probe and probe not in seen: - seen.add(probe) - parent = os.path.dirname(probe) - if parent == probe: - break - probe = parent - try: - usage = _statvfs_usage(probe) - usage.update({"path": clean_path, "source_path": probe, "fallback": True, "measure_source": "local-statvfs", "warning": str(first_exc)}) - break - except Exception: - continue - return usage - - -def disk_usage_for_default_path(profile: dict) -> dict: - """Filesystem usage for the rTorrent default download directory.""" - path = default_download_path(profile) - cache_key = f"default-disk:{profile.get('id')}:{path}" - now = time.monotonic() - cached = _DISK_USAGE_CACHE.get(cache_key) - if cached and now - cached[0] < _DISK_USAGE_TTL_SECONDS: - return dict(cached[1]) - usage = _disk_usage_for_path(profile, path, allow_parent_fallback=True) - _DISK_USAGE_CACHE[cache_key] = (now, dict(usage)) - return usage - - -def disk_usage_for_paths(profile: dict, paths: list[str] | None = None, mode: str = 'default', selected_path: str = '') -> dict: - # Note: Aggregate/selected modes measure exact user paths on the rTorrent host; they do not fall back to parent/root partitions. - default_path = default_download_path(profile) - mode = mode if mode in {'default', 'selected', 'aggregate'} else 'default' - user_paths: list[str] = [] - for item in paths or []: - path = _remote_clean_path(str(item or '').strip()) - if path and path not in user_paths: - user_paths.append(path) - selected_path = _remote_clean_path(str(selected_path or '').strip()) - if mode == 'selected': - source_paths = [selected_path] if selected_path else list(user_paths) - elif mode == 'aggregate': - source_paths = list(user_paths) - else: - source_paths = [default_path] - if mode in {'selected', 'aggregate'} and not source_paths: - source_paths = [default_path] - clean_paths: list[str] = [] - for item in source_paths: - path = _remote_clean_path(str(item or '').strip()) - if path and path not in clean_paths: - clean_paths.append(path) - entries = [_disk_usage_for_path(profile, path, allow_parent_fallback=(mode == 'default')) for path in clean_paths] - chosen = entries[0] if entries else _disk_usage_for_path(profile, default_path, allow_parent_fallback=True) - if mode == 'selected' and selected_path: - chosen = next((x for x in entries if x.get('path') == selected_path), chosen) - elif mode == 'aggregate': - ok_entries = [x for x in entries if x.get('ok')] - total = sum(int(x.get('total') or 0) for x in ok_entries) - used = sum(int(x.get('used') or 0) for x in ok_entries) - free = sum(int(x.get('free') or 0) for x in ok_entries) - chosen = _usage_dict(total, used, free) if ok_entries else {"ok": False, "total": 0, "used": 0, "free": 0, "total_h": "0 B", "used_h": "0 B", "free_h": "0 B", "percent": 0} - chosen.update({'path': 'aggregate', 'source_path': 'aggregate', 'fallback': False, 'measure_source': 'rtorrent-df'}) - chosen = dict(chosen) - chosen['mode'] = mode - chosen['paths'] = entries - return chosen - - -def _safe_rtorrent_int(callable_obj, default=None): - """Return an integer rTorrent metric without failing the whole status poll.""" - try: - value = callable_obj() - return int(value) - except Exception: - return default - - -def _safe_rtorrent_time(c): - """Read rTorrent server time when supported; otherwise let the browser clock remain authoritative.""" - candidates = ( - lambda: c.system.time_seconds(), - lambda: c.system.time(), - ) - for candidate in candidates: - value = _safe_rtorrent_int(candidate) - if value: - return value - return None - -def system_status(profile: dict) -> dict: - c = client_for(profile) - version = str(c.system.client_version()) - try: - down_limit = int(c.throttle.global_down.max_rate()) - except Exception: - down_limit = 0 - try: - up_limit = int(c.throttle.global_up.max_rate()) - except Exception: - up_limit = 0 - rows = list_torrents(profile) - # Note: ruTorrent-style footer metrics. Missing XMLRPC methods are shown as unavailable instead of breaking polling. - open_sockets = _safe_rtorrent_int(lambda: c.network.open_sockets()) - max_open_sockets = _safe_rtorrent_int(lambda: c.network.max_open_sockets()) - rtorrent_time = _safe_rtorrent_time(c) - checking_count = sum(1 for t in rows if t.get("status") == "Checking" or int(t.get("hashing") or 0) > 0) - return { - "ok": True, - "version": version, - "total": len(rows), - "active": sum(1 for t in rows if t["state"]), - "seeding": sum(1 for t in rows if t["complete"] and t["state"] and not t.get("paused")), - "leeching": sum(1 for t in rows if not t["complete"] and t["state"] and not t.get("paused") and t.get("status") != "Checking"), - "checking": checking_count, - "paused": sum(1 for t in rows if t.get("paused")), - "stopped": sum(1 for t in rows if not t["state"]), - "down_rate": sum(t["down_rate"] for t in rows), - "down_rate_h": human_rate(sum(t["down_rate"] for t in rows)), - "up_rate": sum(t["up_rate"] for t in rows), - "up_rate_h": human_rate(sum(t["up_rate"] for t in rows)), - "down_limit": down_limit, - "up_limit": up_limit, - "down_limit_h": human_rate(down_limit) if down_limit else "∞", - "up_limit_h": human_rate(up_limit) if up_limit else "∞", - "total_down": sum(t["down_total"] for t in rows), - "total_up": sum(t["up_total"] for t in rows), - "total_down_h": human_size(sum(t["down_total"] for t in rows)), - "total_up_h": human_size(sum(t["up_total"] for t in rows)), - "open_sockets": open_sockets, - "max_open_sockets": max_open_sockets, - "rtorrent_time": rtorrent_time, - "disk": disk_usage_for_default_path(profile), - } - - -def scgi_diagnostics(profile: dict) -> dict: - c = client_for(profile) - started = time.perf_counter() - body = dumps((), methodname="system.client_version", allow_none=True).encode("utf-8") - headers = { - "CONTENT_LENGTH": str(len(body)), - "SCGI": "1", - "REQUEST_METHOD": "POST", - "REQUEST_URI": c.path, - "SCRIPT_NAME": c.path, - "SERVER_PROTOCOL": "HTTP/1.1", - "CONTENT_TYPE": "text/xml", - } - header_blob = b"".join(k.encode() + b"\0" + v.encode() + b"\0" for k, v in headers.items()) - payload = str(len(header_blob)).encode("ascii") + b":" + header_blob + b"," + body - metrics = { - "url": profile.get("scgi_url"), - "host": c.host, - "port": c.port, - "path": c.path, - "timeout_seconds": c.timeout, - "request_bytes": len(payload), - } - connect_started = time.perf_counter() - with socket.create_connection((c.host, c.port), timeout=c.timeout) as sock: - sock.settimeout(c.timeout) - metrics["connect_ms"] = round((time.perf_counter() - connect_started) * 1000, 2) - send_started = time.perf_counter() - sock.sendall(payload) - metrics["send_ms"] = round((time.perf_counter() - send_started) * 1000, 2) - chunks: list[bytes] = [] - first_byte_at = None - while True: - chunk = sock.recv(65536) - if chunk and first_byte_at is None: - first_byte_at = time.perf_counter() - if not chunk: - break - chunks.append(chunk) - response = b"".join(chunks) - metrics["response_bytes"] = len(response) - metrics["first_byte_ms"] = round(((first_byte_at or time.perf_counter()) - started) * 1000, 2) - metrics["total_ms"] = round((time.perf_counter() - started) * 1000, 2) - if not response: - raise ConnectionError("Empty response from rTorrent SCGI") - xml_response = response - if b"\r\n\r\n" in xml_response: - xml_response = xml_response.split(b"\r\n\r\n", 1)[1] - elif b"\n\n" in xml_response: - xml_response = xml_response.split(b"\n\n", 1)[1] - result, _ = loads(xml_response) - metrics["xml_bytes"] = len(xml_response) - metrics["client_version"] = str(result[0]) if result else "" - metrics["ok"] = True - return metrics - - -def torrent_files(profile: dict, torrent_hash: str) -> list[dict]: - rows = client_for(profile).f.multicall(torrent_hash, "", "f.path=", "f.size_bytes=", "f.completed_chunks=", "f.size_chunks=", "f.priority=") - files = [] - for idx, r in enumerate(rows): - size = int(r[1] or 0) - completed_chunks = int(r[2] or 0) - size_chunks = int(r[3] or 0) - progress = 100.0 if size <= 0 else round((completed_chunks / size_chunks) * 100, 2) if size_chunks else 0.0 - files.append({ - "index": idx, - "path": r[0], - "size": size, - "size_h": human_size(size), - "completed_chunks": completed_chunks, - "size_chunks": size_chunks, - "progress": min(100.0, max(0.0, progress)), - "priority": int(r[4] or 0), - }) - return files - - -def torrent_file_tree(profile: dict, torrent_hash: str) -> dict: - # Note: The tree is built from rTorrent file paths without changing the existing flat file API. - root = {"name": "", "path": "", "type": "directory", "size": 0, "children": {}} - for item in torrent_files(profile, torrent_hash): - parts = [part for part in str(item.get("path") or "").split("/") if part] - node = root - prefix: list[str] = [] - for part in parts[:-1]: - prefix.append(part) - children = node.setdefault("children", {}) - node = children.setdefault(part, {"name": part, "path": "/".join(prefix), "type": "directory", "size": 0, "children": {}}) - name = parts[-1] if parts else str(item.get("path") or f"file-{item.get('index')}") - child = dict(item) - child.update({"name": name, "type": "file"}) - node.setdefault("children", {})[name] = child - def finalize(node: dict) -> dict: - if node.get("type") == "file": - return node - children = [finalize(v) for v in node.get("children", {}).values()] - children.sort(key=lambda x: (x.get("type") != "directory", str(x.get("name") or "").lower())) - node["children"] = children - node["size"] = sum(int(c.get("size") or 0) for c in children) - node["size_h"] = human_size(node["size"]) - return node - return finalize(root) - - - -def _torrent_file_remote_path(profile: dict, torrent_hash: str, index: int) -> tuple[dict, str]: - c = client_for(profile) - files = torrent_files(profile, torrent_hash) - selected = next((f for f in files if int(f.get("index", -1)) == int(index)), None) - if selected is None: - available = ", ".join(str(f.get("index")) for f in files[:20]) or "none" - raise ValueError(f"File index {index} not found. Available indexes: {available}") - base = _remote_clean_path(_torrent_data_path(c, torrent_hash)) - rel = str(selected.get("path") or "").lstrip("/") - if len(files) == 1 and base and not base.endswith("/"): - path = base - else: - path = _remote_join(base, rel) - return selected, path - - -def download_tmp_dir() -> str: - PYTORRENT_TMP_DIR.mkdir(parents=True, exist_ok=True) - return str(PYTORRENT_TMP_DIR) - - -def _remote_readability_error(c: ScgiRtorrentClient, source_path: str) -> str | None: - script = ( - 'p=$1; ' - 'command -v base64 >/dev/null 2>&1 || { echo "base64 command not found on rTorrent host"; exit 0; }; ' - '[ -e "$p" ] || { echo "source file does not exist"; exit 0; }; ' - '[ -f "$p" ] || { echo "source path is not a regular file"; exit 0; }; ' - '[ -r "$p" ] || { echo "source file is not readable by rTorrent"; exit 0; }; ' - 'echo OK' - ) - output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-download-check", source_path) or "").strip() - return None if output == "OK" else (output or "source file cannot be read by rTorrent") - - -def remote_file_readability_error(profile: dict, source_path: str) -> str | None: - return _remote_readability_error(client_for(profile), source_path) - - -def iter_remote_file_chunks(profile: dict, source_path: str, size: int | None = None, chunk_size: int | None = None): - c = client_for(profile) - clean = _remote_clean_path(source_path) - err = _remote_readability_error(c, clean) - if err: - raise RuntimeError(err) - block_size = max(65536, int(chunk_size or REMOTE_READ_CHUNK_BYTES or 1048576)) - offset = 0 - emitted = 0 - script = ( - 'p=$1; bs=$2; skip=$3; ' - 'command -v base64 >/dev/null 2>&1 || { printf "ERR\tbase64 command not found on rTorrent host"; exit 0; }; ' - '[ -r "$p" ] || { printf "ERR\tsource file is not readable by rTorrent"; exit 0; }; ' - 'dd if="$p" bs="$bs" skip="$skip" count=1 2>/dev/null | base64 | tr -d "\n"' - ) - while size is None or emitted < int(size): - output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-download-read", clean, str(block_size), str(offset)) or "") - if output.startswith("ERR\t"): - raise RuntimeError(output.split("\t", 1)[1] or "remote read failed") - if not output: - break - try: - chunk = __import__("base64").b64decode(output, validate=False) - except Exception as exc: - raise RuntimeError(f"remote read returned invalid base64: {exc}") from exc - if not chunk: - break - yield chunk - emitted += len(chunk) - offset += 1 - if size is not None and emitted >= int(size): - break - - -def torrent_download_file_info(profile: dict, torrent_hash: str, index: int) -> dict: - selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index) - err = remote_file_readability_error(profile, remote_path) - if err: - raise RuntimeError(err) - return {**selected, "remote_path": remote_path, "download_name": LocalPath(str(selected.get("path") or remote_path)).name} - - -def torrent_download_zip_items(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> list[dict]: - files = torrent_files(profile, torrent_hash) - wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files} - items = [] - for item in files: - if int(item.get("index", -1)) not in wanted: - continue - _, remote_path = _torrent_file_remote_path(profile, torrent_hash, int(item["index"])) - err = remote_file_readability_error(profile, remote_path) - if err: - raise RuntimeError(f"{item.get('path') or item.get('index')}: {err}") - items.append({**item, "remote_path": remote_path}) - if not items: - raise ValueError("No files selected") - return items - - -def _remote_stage_path(c: ScgiRtorrentClient, source_path: str, suffix: str = "") -> str: - token = uuid.uuid4().hex - safe_suffix = ''.join(ch if ch.isalnum() or ch in '.-_' else '_' for ch in str(suffix or ''))[:80] - target = f"{download_tmp_dir().rstrip('/')}/pytorrent-download-{token}{safe_suffix}" - script = ( - 'src=$1; dst=$2; ' - 'if [ ! -f "$src" ]; then echo "ERR\tmissing source"; exit 0; fi; ' - 'cp -- "$src" "$dst" 2>/tmp/pytorrent-cp-err-$$ || { rc=$?; err=$(cat /tmp/pytorrent-cp-err-$$ 2>/dev/null); rm -f /tmp/pytorrent-cp-err-$$; printf "ERR\t%s\t%s\n" "$rc" "$err"; exit 0; }; ' - 'rm -f /tmp/pytorrent-cp-err-$$; chmod 0644 "$dst" 2>/dev/null || true; printf "OK\t%s\n" "$dst"' - ) - output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-stage-file", source_path, target) or "").strip() - parts = (output.splitlines()[0] if output else "").split("\t", 2) - if len(parts) >= 2 and parts[0] == "OK": - return parts[1] - detail = parts[2] if len(parts) > 2 else (parts[1] if len(parts) > 1 else output) - raise RuntimeError(detail or "Cannot stage file through rTorrent") - - -def _remote_stage_zip(c: ScgiRtorrentClient, files: list[dict], suffix: str = ".zip") -> str: - if not files: - raise ValueError("No files selected") - token = uuid.uuid4().hex - tmp_base = download_tmp_dir().rstrip("/") - list_path = f"{tmp_base}/pytorrent-zip-list-{token}.txt" - zip_path = f"{tmp_base}/pytorrent-download-{token}{suffix}" - lines = [] - for item in files: - src = str(item.get("remote_path") or "") - arc = str(item.get("path") or LocalPath(src).name).lstrip("/") or LocalPath(src).name - lines.append(src.replace("\t", " ") + "\t" + arc.replace("\t", " ")) - list_data = "\n".join(lines) - script = ( - 'list=$1; zip=$2; data=$3; umask 022; printf "%s\n" "$data" > "$list"; ' - 'rm -f "$zip"; tmpdir=$(mktemp -d /tmp/pytorrent-zip-XXXXXX) || exit 3; ' - 'rc=0; while IFS=$(printf "\\t") read -r src arc; do ' - '[ -n "$src" ] || continue; ' - 'if [ ! -f "$src" ]; then echo "missing source: $src" >&2; rc=4; break; fi; ' - 'case "$arc" in /*|../*|*/../*) echo "unsafe zip path: $arc" >&2; rc=5; break;; esac; ' - 'dir=${arc%/*}; if [ "$dir" != "$arc" ]; then mkdir -p "$tmpdir/$dir" || { rc=$?; break; }; fi; cp -- "$src" "$tmpdir/$arc" || { rc=$?; break; }; ' - 'done; if [ $rc -eq 0 ]; then (cd "$tmpdir" && zip -qr "$zip" .) || rc=$?; fi; ' - 'rm -rf "$tmpdir" "$list"; ' - 'if [ $rc -eq 0 ] && [ -f "$zip" ]; then chmod 0644 "$zip" 2>/dev/null || true; printf "OK\t%s\n" "$zip"; else printf "ERR\t%s\n" "$rc"; fi' - ) - output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-stage-zip", list_path, zip_path, list_data) or "").strip() - parts = (output.splitlines()[0] if output else "").split("\t", 1) - if len(parts) == 2 and parts[0] == "OK": - return parts[1] - raise RuntimeError(output or "Cannot create ZIP through rTorrent") - - -def _remote_remove_staged(profile: dict, path: str) -> None: - clean = str(path or "") - tmp_prefix = download_tmp_dir().rstrip("/") + "/pytorrent-download-" - if not clean.startswith(tmp_prefix): - return - try: - _rt_execute(client_for(profile), "execute.throw", "rm", "-f", clean) - except Exception: - pass - - -def torrent_staged_file_path(profile: dict, torrent_hash: str, index: int) -> dict: - c = client_for(profile) - selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index) - suffix = LocalPath(str(selected.get("path") or "file")).suffix - staged = _remote_stage_path(c, remote_path, suffix) - return {**selected, "remote_path": remote_path, "staged_path": staged, "download_name": LocalPath(str(selected.get("path") or staged)).name} - - -def torrent_staged_zip_path(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> dict: - c = client_for(profile) - files = torrent_files(profile, torrent_hash) - wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files} - items = [] - for item in files: - if int(item.get("index", -1)) not in wanted: - continue - _, remote_path = _torrent_file_remote_path(profile, torrent_hash, int(item["index"])) - items.append({**item, "remote_path": remote_path}) - staged = _remote_stage_zip(c, items) - return {"staged_path": staged, "count": len(items)} - - -def _torrent_raw_from_method(c: ScgiRtorrentClient, torrent_hash: str) -> bytes | None: - for method in ("d.get_metafile", "d.metafile"): - try: - value = c.call(method, torrent_hash) - except Exception: - continue - if hasattr(value, "data"): - data = value.data - elif isinstance(value, bytes): - data = value - elif isinstance(value, str): - data = value.encode("latin-1", "ignore") - else: - data = None - if data: - return bytes(data) - return None - - -def _torrent_source_file(c: ScgiRtorrentClient, torrent_hash: str) -> str: - for method in ("d.tied_to_file", "d.get_tied_to_file", "d.loaded_file", "d.get_loaded_file", "d.session_file", "d.get_session_file"): - try: - value = str(c.call(method, torrent_hash) or "").strip() - except Exception: - continue - if value: - return value - return "" - - -def export_torrent_file(profile: dict, torrent_hash: str) -> dict: - c = client_for(profile) - name = str(c.call("d.name", torrent_hash) or torrent_hash).strip() or torrent_hash - filename = f"{name}.torrent" if not name.lower().endswith(".torrent") else name - raw = _torrent_raw_from_method(c, torrent_hash) - if raw: - target = LocalPath(download_tmp_dir()) / f"pytorrent-download-{uuid.uuid4().hex}.torrent" - target.write_bytes(raw) - return {"path": str(target), "download_name": filename, "local": True} - source = _torrent_source_file(c, torrent_hash) - if not source: - raise RuntimeError("Cannot find torrent source file in rTorrent") - staged = _remote_stage_path(c, source, ".torrent") - return {"path": staged, "download_name": filename, "local": False} - -def set_folder_priority(profile: dict, torrent_hash: str, folder_path: str, priority: int) -> dict: - # Note: Folder priority applies the same rTorrent file priority to every descendant path. - folder = str(folder_path or "").strip().strip("/") - updates = [] - for item in torrent_files(profile, torrent_hash): - path = str(item.get("path") or "").strip("/") - if not folder or path == folder or path.startswith(folder + "/"): - updates.append({"index": item["index"], "priority": int(priority)}) - if not updates: - return {"updated": [], "errors": [{"folder": folder_path, "error": "No files matched folder"}]} - return set_file_priorities(profile, torrent_hash, updates) - - -def torrent_local_file_path(profile: dict, torrent_hash: str, index: int) -> str: - c = client_for(profile) - files = torrent_files(profile, torrent_hash) - selected = next((f for f in files if int(f.get("index", -1)) == int(index)), None) - if not selected: - raise ValueError("File index not found") - base = _remote_clean_path(_torrent_data_path(c, torrent_hash)) - rel = str(selected.get("path") or "").lstrip("/") - if len(files) == 1 and base and not base.endswith("/"): - path = base - else: - path = _remote_join(base, rel) - # Note: HTTP file serving is enabled only for local profiles to avoid pretending remote files exist locally. - if int(profile.get("is_remote") or 0): - raise ValueError("HTTP file download is available only for local rTorrent profiles") - local = LocalPath(path).resolve() - if not local.exists() or not local.is_file(): - raise FileNotFoundError(f"Local file is not available: {local}") - return str(local) - - -def torrent_local_file_paths(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> list[dict]: - files = torrent_files(profile, torrent_hash) - wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files} - out = [] - for item in files: - if int(item.get("index", -1)) not in wanted: - continue - out.append({**item, "local_path": torrent_local_file_path(profile, torrent_hash, int(item["index"]))}) - return out - -def torrent_peers(profile: dict, torrent_hash: str) -> list[dict]: - fields = [ - "p.address=", "p.client_version=", "p.completed_percent=", "p.down_rate=", - "p.up_rate=", "p.port=", "p.is_encrypted=", "p.is_incoming=", - "p.is_snubbed=", "p.is_banned=", - ] - try: - rows = client_for(profile).p.multicall(torrent_hash, "", *fields) - except Exception: - fields = ["p.address=", "p.client_version=", "p.completed_percent=", "p.down_rate=", "p.up_rate=", "p.port=", "p.is_encrypted="] - rows = client_for(profile).p.multicall(torrent_hash, "", *fields) - peers = [] - for idx, r in enumerate(rows): - peers.append({ - "index": idx, - "ip": r[0], - "client": r[1], - "completed": int(r[2] or 0), - "down_rate": int(r[3] or 0), - "down_rate_h": human_rate(r[3] or 0), - "up_rate": int(r[4] or 0), - "up_rate_h": human_rate(r[4] or 0), - "port": int(r[5] or 0), - "encrypted": bool(r[6]) if len(r) > 6 else False, - "incoming": bool(r[7]) if len(r) > 7 else False, - "snubbed": bool(r[8]) if len(r) > 8 else False, - "banned": bool(r[9]) if len(r) > 9 else False, - }) - return peers - - - - -def _call_first(c: ScgiRtorrentClient, candidates: list[tuple[str, tuple]]) -> dict: - errors = [] - for method, args in candidates: - try: - result = c.call(method, *args) - return {"ok": True, "method": method, "result": result} - except Exception as exc: - errors.append(f"{method}: {exc}") - raise RuntimeError("; ".join(errors)) - - - -def _tracker_domain(url: str) -> str: - raw = str(url or '').strip() - if not raw: - return '' - parsed = urlparse(raw if '://' in raw else f'http://{raw}') - host = (parsed.hostname or '').lower().strip('.') - if host.startswith('www.'): - host = host[4:] - return host - - -def tracker_summary(profile: dict, torrent_hashes: list[str] | None = None, limit: int = 1000) -> dict: - """Return tracker domains grouped by torrent for the sidebar filter.""" - # Note: Tracker summary is read-only and isolated from the normal torrent snapshot, so slow tracker RPC calls cannot break the main list. - hashes = [str(h or '').strip() for h in (torrent_hashes or []) if str(h or '').strip()] - if not hashes: - hashes = [t.get('hash') for t in list_torrents(profile) if t.get('hash')] - hashes = hashes[:max(1, int(limit or 1000))] - by_hash: dict[str, list[dict]] = {} - counts: dict[str, dict] = {} - errors = [] - for h in hashes: - try: - items = [] - seen = set() - for tr in torrent_trackers(profile, h): - url = str(tr.get('url') or '') - domain = _tracker_domain(url) - if not domain or domain in seen: - continue - seen.add(domain) - item = {'domain': domain, 'url': url} - items.append(item) - row = counts.setdefault(domain, {'domain': domain, 'url': url, 'count': 0}) - row['count'] += 1 - by_hash[h] = items - except Exception as exc: - errors.append({'hash': h, 'error': str(exc)}) - by_hash[h] = [] - trackers = sorted(counts.values(), key=lambda x: (-int(x.get('count') or 0), str(x.get('domain') or ''))) - return {'hashes': by_hash, 'trackers': trackers, 'errors': errors, 'scanned': len(hashes)} - -def _safe_tracker_call(c: ScgiRtorrentClient, method: str, target: str, default=None): - try: - return c.call(method, target) - except Exception: - return default - - -def _tracker_target(torrent_hash: str, index: int) -> str: - return f"{torrent_hash}:t{int(index)}" - -def _tracker_int(value, default=None): - try: - if value is None or value == "": - return default - return int(value) - except Exception: - return default - - -def _tracker_rows(c: ScgiRtorrentClient, torrent_hash: str) -> list[list]: - fields = ("t.url=", "t.is_enabled=", "t.scrape_complete=", "t.scrape_incomplete=", "t.scrape_downloaded=") - errors: list[str] = [] - for args in ((torrent_hash, "", *fields), ("", torrent_hash, *fields)): - try: - rows = c.call("t.multicall", *args) - return [list(r) for r in (rows or [])] - except Exception as exc: - errors.append(f"t.multicall{args[:2]}: {exc}") - # Note: Fallback keeps the sidebar tracker filter usable on rTorrent builds without t.multicall scrape fields. - total = _tracker_int(_safe_tracker_call(c, "d.tracker_size", torrent_hash, 0), 0) or 0 - rows: list[list] = [] - for index in range(max(0, total)): - target = _tracker_target(torrent_hash, index) - url = _safe_tracker_call(c, "t.url", target, "") - if not url: - for args in ((torrent_hash, index), ("", torrent_hash, index)): - try: - url = c.call("t.url", *args) - break - except Exception: - continue - if url: - enabled = _safe_tracker_call(c, "t.is_enabled", target, 1) - rows.append([url, enabled, None, None, None]) - if rows: - return rows - raise RuntimeError("Cannot read trackers: " + "; ".join(errors)) - - -def torrent_trackers(profile: dict, torrent_hash: str) -> list[dict]: - c = client_for(profile) - rows = _tracker_rows(c, torrent_hash) - trackers = [] - for idx, r in enumerate(rows): - target = _tracker_target(torrent_hash, idx) - last_announce = _safe_tracker_call(c, "t.activity_time_last", target, 0) - scrape_time = _safe_tracker_call(c, "t.scrape_time_last", target, 0) - if not last_announce: - last_announce = scrape_time - next_announce = _safe_tracker_call(c, "t.activity_time_next", target, 0) - raw_seeds = _tracker_int(r[2], None) - raw_peers = _tracker_int(r[3], None) - raw_downloaded = _tracker_int(r[4], None) - has_scrape = bool(_tracker_int(scrape_time, 0)) or raw_seeds not in (None, 0) or raw_peers not in (None, 0) or raw_downloaded not in (None, 0) - trackers.append({ - "index": idx, - "url": str(r[0] or ""), - "enabled": bool(r[1]), - "seeds": raw_seeds if has_scrape else None, - "peers": raw_peers if has_scrape else None, - "downloaded": raw_downloaded if has_scrape else None, - "has_scrape": has_scrape, - "last_announce": int(last_announce or 0), - "next_announce": int(next_announce or 0), - }) - return trackers - -def tracker_action(profile: dict, torrent_hash: str, action_name: str, payload: dict | None = None) -> dict: - payload = payload or {} - c = client_for(profile) - if action_name == "reannounce": - return _call_first(c, [ - ("d.tracker_announce", (torrent_hash,)), - ("d.tracker_announce", ("", torrent_hash)), - ("d.tracker_announce.force", (torrent_hash,)), - ]) - if action_name == "add": - url = str(payload.get("url") or "").strip() - if not url: - raise ValueError("Missing tracker URL") - return _call_first(c, [ - ("d.tracker.insert", (torrent_hash, "", url)), - ("d.tracker.insert", (torrent_hash, 0, url)), - ("d.tracker.insert", ("", torrent_hash, "", url)), - ]) - if action_name in {"delete", "remove"}: - # Note: Deleting trackers is guarded to keep at least one tracker attached to the torrent. - index = int(payload.get("index", -1)) - if index < 0: - raise ValueError("Invalid tracker index") - total = _tracker_int(_safe_tracker_call(c, "d.tracker_size", torrent_hash, 0), 0) or len(torrent_trackers(profile, torrent_hash)) - if total <= 1: - raise ValueError("Cannot delete the last tracker") - if index >= total: - raise ValueError("Invalid tracker index") - return _call_first(c, [ - ("d.tracker.remove", (torrent_hash, index)), - ("d.tracker.remove", (torrent_hash, "", index)), - ("d.tracker.erase", (torrent_hash, index)), - ("d.tracker.erase", (torrent_hash, "", index)), - ("d.tracker.delete", (torrent_hash, index)), - ("d.tracker.delete", (torrent_hash, "", index)), - ]) - raise ValueError(f"Unknown tracker action: {action_name}") - - -RTORRENT_CONFIG_FIELDS = [ - {"group": "Directories", "key": "directory.default", "label": "Default download directory", "type": "text"}, - {"group": "Directories", "key": "session.path", "label": "Session path", "type": "text"}, - {"group": "Directories", "key": "system.cwd", "label": "Working directory", "type": "text", "readonly": True}, - {"group": "Network", "key": "network.port_range", "label": "Incoming port range", "type": "text", "placeholder": "49164-49164"}, - {"group": "Network", "key": "network.port_random", "label": "Random incoming port", "type": "bool"}, - {"group": "Network", "key": "network.bind_address", "label": "Bind address", "type": "text", "placeholder": "0.0.0.0"}, - {"group": "Network", "key": "network.local_address", "label": "Local address", "type": "text"}, - {"group": "Network", "key": "network.max_open_files", "label": "Max open files", "type": "number"}, - {"group": "Network", "key": "network.max_open_sockets", "label": "Max open sockets", "type": "number"}, - {"group": "Network", "key": "network.http.max_open", "label": "Max HTTP connections", "type": "number"}, - {"group": "Network", "key": "network.http.ssl_verify_peer", "label": "Verify SSL peers", "type": "bool"}, - {"group": "Peers", "key": "throttle.min_peers.normal", "label": "Min peers downloading", "type": "number"}, - {"group": "Peers", "key": "throttle.max_peers.normal", "label": "Max peers downloading", "type": "number"}, - {"group": "Peers", "key": "throttle.min_peers.seed", "label": "Min peers seeding", "type": "number"}, - {"group": "Peers", "key": "throttle.max_peers.seed", "label": "Max peers seeding", "type": "number"}, - {"group": "Peers", "key": "trackers.numwant", "label": "Tracker numwant", "type": "number"}, - {"group": "Throttle", "key": "throttle.global_down.max_rate", "label": "Global download limit B/s", "type": "number"}, - {"group": "Throttle", "key": "throttle.global_up.max_rate", "label": "Global upload limit B/s", "type": "number"}, - {"group": "Throttle", "key": "throttle.max_downloads.global", "label": "Max active downloads", "type": "number"}, - {"group": "Throttle", "key": "throttle.max_uploads.global", "label": "Max active uploads", "type": "number"}, - {"group": "Throttle", "key": "throttle.max_downloads.div", "label": "Max downloads per throttle", "type": "number"}, - {"group": "Throttle", "key": "throttle.max_uploads.div", "label": "Max uploads per throttle", "type": "number"}, - {"group": "DHT / PEX", "key": "dht.mode", "label": "DHT mode", "type": "text", "placeholder": "disable/off/auto/on"}, - {"group": "DHT / PEX", "key": "dht.port", "label": "DHT port", "type": "number"}, - {"group": "DHT / PEX", "key": "protocol.pex", "label": "Peer exchange", "type": "bool"}, - {"group": "Protocol", "key": "protocol.encryption.set", "label": "Encryption flags", "type": "text", "placeholder": "allow_incoming,try_outgoing,enable_retry"}, - {"group": "Protocol", "key": "protocol.connection.leech", "label": "Leech connection type", "type": "text", "placeholder": "leech"}, - {"group": "Protocol", "key": "protocol.connection.seed", "label": "Seed connection type", "type": "text", "placeholder": "seed"}, - {"group": "Files", "key": "pieces.hash.on_completion", "label": "Hash check on completion", "type": "bool"}, - {"group": "Files", "key": "pieces.preload.type", "label": "Pieces preload type", "type": "number"}, - {"group": "Files", "key": "pieces.preload.min_size", "label": "Pieces preload min size", "type": "number"}, - {"group": "Files", "key": "pieces.preload.min_rate", "label": "Pieces preload min rate", "type": "number"}, - {"group": "Files", "key": "system.file.allocate", "label": "File allocation", "type": "number"}, - {"group": "Files", "key": "system.file.max_size", "label": "Max file size", "type": "number"}, - {"group": "System", "key": "system.umask", "label": "File umask", "type": "text", "placeholder": "0002"}, - {"group": "System", "key": "system.hostname", "label": "Hostname", "type": "text", "readonly": True}, - {"group": "System", "key": "system.client_version", "label": "Client version", "type": "text", "readonly": True}, - {"group": "System", "key": "system.library_version", "label": "Library version", "type": "text", "readonly": True}, -] - - -def _normalize_config_value(meta: dict, value): - if meta.get("type") == "bool": - return "1" if str(value).lower() in {"1", "true", "yes", "on"} or value is True else "0" - if meta.get("type") == "number": - return str(int(value or 0)) - return str(value or "").strip() - - -def saved_config_overrides(profile_id: int, user_id: int | None = None) -> dict[str, dict]: - user_id = user_id or default_user_id() - with connect() as conn: - rows = conn.execute( - "SELECT key,value,baseline_value,apply_on_start,updated_at FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=?", - (user_id, int(profile_id)), - ).fetchall() - return {r["key"]: r for r in rows} - - -def get_config(profile: dict) -> dict: - c = client_for(profile) - saved = saved_config_overrides(int(profile["id"])) - fields = [] - for meta in RTORRENT_CONFIG_FIELDS: - item = dict(meta) - saved_item = saved.get(meta["key"]) - try: - item["value"] = _normalize_config_value(meta, c.call(meta["key"])) - item["current_value"] = item["value"] - item["ok"] = True - except Exception as exc: - item["value"] = "" - item["current_value"] = "" - item["ok"] = False - item["error"] = str(exc) - if saved_item: - saved_value = _normalize_config_value(meta, saved_item.get("value")) - baseline_raw = saved_item.get("baseline_value") - if baseline_raw not in (None, ""): - baseline_value = _normalize_config_value(meta, baseline_raw) - else: - baseline_value = _normalize_config_value(meta, item.get("current_value")) - item["saved"] = True - item["saved_value"] = saved_value - item["baseline_value"] = baseline_value - item["apply_on_start"] = bool(saved_item.get("apply_on_start")) - item["changed"] = saved_value != baseline_value - fields.append(item) - return {"fields": fields, "apply_on_start": any(bool(v.get("apply_on_start")) for v in saved.values())} - - - -def default_download_path(profile: dict) -> str: - """Return rTorrent default download directory for the active profile.""" - c = client_for(profile) - errors = [] - for method in ("directory.default", "system.cwd"): - try: - value = str(c.call(method) or "").strip() - if value: - return value - except Exception as exc: - errors.append(f"{method}: {exc}") - raise RuntimeError("Cannot read rTorrent default download directory: " + "; ".join(errors)) - -def generate_config_text(values: dict) -> str: - known = {f["key"]: f for f in RTORRENT_CONFIG_FIELDS} - lines = [] - for key, value in (values or {}).items(): - meta = known.get(key) - if not meta or meta.get("readonly"): - continue - normalized = _normalize_config_value(meta, value) - if meta.get("type") == "text" and any(ch.isspace() for ch in normalized): - normalized = '"' + normalized.replace('\\', '\\\\').replace('"', '\\"') + '"' - lines.append(f"{key}.set = {normalized}") - return "\n".join(lines) + ("\n" if lines else "") - - -def _read_rtorrent_config_value(client, key: str, meta: dict) -> str: - return _normalize_config_value(meta, client.call(key)) - - -def store_config_overrides(profile: dict, values: dict, apply_on_start: bool, baseline_values: dict | None = None, clear_keys: list[str] | None = None) -> list[str]: - known = {f["key"]: f for f in RTORRENT_CONFIG_FIELDS} - user_id = default_user_id() - now = utcnow() - profile_id = int(profile["id"]) - baseline_values = baseline_values or {} - clear_set = set(clear_keys or []) - stored = [] - with connect() as conn: - for key in clear_set: - if key in known: - conn.execute( - "DELETE FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=? AND key=?", - (user_id, profile_id, key), - ) - for key, value in (values or {}).items(): - if key in clear_set: - continue - meta = known.get(key) - if not meta or meta.get("readonly"): - continue - normalized = _normalize_config_value(meta, value) - existing = conn.execute( - "SELECT baseline_value FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=? AND key=?", - (user_id, profile_id, key), - ).fetchone() - existing_baseline = existing.get("baseline_value") if existing else None - - # Keep the first reference value forever until the override is cleared. - # Without this, a second save could treat already-overridden rTorrent - # values as the new baseline and the UI would stop marking them as changed. - if existing_baseline not in (None, ""): - baseline = _normalize_config_value(meta, existing_baseline) - else: - baseline = _normalize_config_value(meta, baseline_values.get(key)) if key in baseline_values else None - - if baseline not in (None, "") and normalized == baseline: - conn.execute( - "DELETE FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=? AND key=?", - (user_id, profile_id, key), - ) - continue - conn.execute( - "INSERT OR REPLACE INTO rtorrent_config_overrides(user_id,profile_id,key,value,baseline_value,apply_on_start,updated_at) VALUES(?,?,?,?,?,?,?)", - (user_id, profile_id, key, normalized, baseline, 1 if apply_on_start else 0, now), - ) - stored.append(key) - conn.execute( - "UPDATE rtorrent_config_overrides SET apply_on_start=?, updated_at=? WHERE user_id=? AND profile_id=?", - (1 if apply_on_start else 0, now, user_id, profile_id), - ) - return stored - - -def set_config(profile: dict, values: dict, apply_now: bool = True, apply_on_start: bool = False, clear_keys: list[str] | None = None) -> dict: - updated, errors = [], [] - known = {f["key"]: f for f in RTORRENT_CONFIG_FIELDS} - c = client_for(profile) - baseline_values = {} - for key, raw_value in (values or {}).items(): - meta = known.get(key) - if not meta or meta.get("readonly"): - continue - try: - baseline_values[key] = _read_rtorrent_config_value(c, key, meta) - except Exception: - pass - stored = store_config_overrides(profile, values, apply_on_start, baseline_values, clear_keys) - if not apply_now: - return {"ok": True, "updated": [], "stored": stored, "errors": []} - for key, raw_value in (values or {}).items(): - if key not in known: - continue - meta = known[key] - if meta.get("readonly"): - continue - value = _normalize_config_value(meta, raw_value) - rpc_value = int(value) if meta.get("type") in {"bool", "number"} else value - try: - try: - c.call(key + ".set", "", rpc_value) - except Exception: - c.call(key + ".set", rpc_value) - updated.append(key) - except Exception as exc: - errors.append({"key": key, "error": str(exc)}) - return {"ok": not errors, "updated": updated, "stored": stored, "errors": errors} - - -def apply_startup_overrides(profile: dict) -> dict: - rows = saved_config_overrides(int(profile["id"])) - values = {k: v.get("value") for k, v in rows.items() if v.get("apply_on_start")} - if not values: - return {"ok": True, "updated": [], "errors": [], "skipped": True} - return set_config(profile, values, apply_now=True, apply_on_start=True) - - -def _int_rpc(c: ScgiRtorrentClient, method: str, h: str, default: int = 0) -> int: - try: - return int(c.call(method, h) or 0) - except Exception: - return default - - -def _str_rpc(c: ScgiRtorrentClient, method: str, h: str, default: str = '') -> str: - try: - return str(c.call(method, h) or '') - except Exception: - return default - - -def _download_runtime_state(c: ScgiRtorrentClient, h: str) -> dict: - """Read rTorrent state using the native pause model: stopped, paused or active.""" - state = _int_rpc(c, 'd.state', h) - active = _int_rpc(c, 'd.is_active', h) - opened = _int_rpc(c, 'd.is_open', h) - # Note: In rTorrent, pause does not change d.state. Paused means state=1, open=1, active=0. - return { - 'state': state, - 'open': opened, - 'active': active, - 'paused': bool(state and opened and not active), - 'stopped': not bool(state), - 'message': _str_rpc(c, 'd.message', h), - } - - -def pause_hash(c: ScgiRtorrentClient, torrent_hash: str) -> dict: - """Pause an active rTorrent item without stopping or closing it.""" - h = str(torrent_hash or '') - if not h: - return {'hash': h, 'ok': False, 'error': 'missing hash'} - before = _download_runtime_state(c, h) - result = {'hash': h, 'before': before, 'commands': []} - try: - if before.get('stopped'): - # Note: rTorrent does not turn a stopped item into a paused one with d.pause alone. - # First move it out of STOP, then pause it, which matches the expected START -> PAUSE flow. - try: - c.call('d.open', h) - result['commands'].append('d.open') - except Exception as exc: - result.setdefault('ignored_errors', []).append(f'd.open: {exc}') - c.call('d.start', h) - result['commands'].append('d.start') - # Note: Smart Queue frees a slot with d.pause, not d.stop, so later d.resume behaves like ruTorrent. - c.call('d.pause', h) - result['commands'].append('d.pause') - result['after'] = _download_runtime_state(c, h) - result['ok'] = True - except Exception as exc: - result.update({'ok': False, 'error': str(exc), 'after': _download_runtime_state(c, h)}) - return result - - -def stop_hash(c: ScgiRtorrentClient, torrent_hash: str) -> dict: - """Stop an active rTorrent item without using pause semantics.""" - h = str(torrent_hash or '') - if not h: - return {'hash': h, 'ok': False, 'error': 'missing hash'} - before = _download_runtime_state(c, h) - result = {'hash': h, 'before': before, 'commands': []} - if before.get('stopped'): - result.update({'ok': True, 'skipped': 'already_stopped', 'after': before}) - return result - try: - # Note: Smart Queue now enforces the queue with d.stop only; user-paused torrents stay untouched. - c.call('d.stop', h) - result['commands'].append('d.stop') - result['after'] = _download_runtime_state(c, h) - result['ok'] = True - except Exception as exc: - result.update({'ok': False, 'error': str(exc), 'after': _download_runtime_state(c, h)}) - return result - - -def resume_paused_hash(c: ScgiRtorrentClient, torrent_hash: str) -> dict: - """Resume only a paused rTorrent item; never convert it through stop/start.""" - h = str(torrent_hash or '') - if not h: - return {'hash': h, 'ok': False, 'error': 'missing hash'} - before = _download_runtime_state(c, h) - result: dict = {'hash': h, 'before': before, 'commands': []} - if before.get('stopped'): - result.update({'ok': False, 'skipped': 'stopped_not_paused', 'after': before}) - return result - if before.get('active'): - result.update({'ok': True, 'skipped': 'already_active', 'after': before}) - return result - try: - # Note: ruTorrent unpauses with the equivalent of d.resume. Do not add d.start/d.open, - # because those commands belong to Stopped/Open state, not a clean Paused state. - c.call('d.resume', h) - result['commands'].append('d.resume') - result['after'] = _download_runtime_state(c, h) - result['ok'] = True - except Exception as exc: - result.update({'ok': False, 'error': str(exc), 'after': _download_runtime_state(c, h)}) - return result - - -def start_or_resume_hash(c: ScgiRtorrentClient, torrent_hash: str, prefer_start: bool = False) -> dict: - """Start stopped torrents or resume real paused torrents. - - Smart Queue passes prefer_start=True for candidates that were selected as stopped. - This avoids treating rTorrent's intermediate open/inactive state after a check as - a user pause and sending only d.resume, which can leave items pending forever. - """ - h = str(torrent_hash or '') - if not h: - return {'hash': h, 'ok': False, 'error': 'missing hash'} - before = _download_runtime_state(c, h) - result: dict = {'hash': h, 'before': before, 'commands': []} - - if before.get('active'): - result.update({'ok': True, 'skipped': 'already_active', 'after': before}) - return result - - if before.get('paused') and not prefer_start: - # Note: Manual Start keeps the clean pause-to-resume path. Do not classify every - # state=1/active=0 item as paused; after auto-check this can be only a transient - # open/inactive rTorrent state and needs d.open + d.start. - resumed = resume_paused_hash(c, h) - resumed['mode'] = 'resume_paused' - return resumed - - try: - c.call('d.open', h) - result['commands'].append('d.open') - except Exception as exc: - result.setdefault('ignored_errors', []).append(f'd.open: {exc}') - try: - c.call('d.start', h) - result['commands'].append('d.start') - except Exception as exc: - result.setdefault('ignored_errors', []).append(f'd.start: {exc}') - try: - c.call('d.try_start', h) - result['commands'].append('d.try_start') - except Exception as exc2: - result.setdefault('ignored_errors', []).append(f'd.try_start: {exc2}') - result['ok'] = False - result['after'] = _download_runtime_state(c, h) - result['ok'] = result.get('ok', True) - return result - -def action(profile: dict, torrent_hashes: list[str], name: str, payload: dict | None = None) -> dict: - payload = payload or {} - c = client_for(profile) - methods = { - "stop": "d.stop", - "recheck": "d.check_hash", - "reannounce": "d.tracker_announce", - "remove": "d.erase", - } - if name == "set_label": - label = str(payload.get("label") or "").strip() - for h in torrent_hashes: - c.call("d.custom1.set", h, label) - return {"ok": True, "count": len(torrent_hashes), "label": label} - if name == "set_ratio_group": - group = str(payload.get("ratio_group") or "").strip() - for h in torrent_hashes: - c.call("d.custom.set", h, "py_ratio_group", group) - return {"ok": True, "count": len(torrent_hashes), "ratio_group": group} - if name == "move": - path = _remote_clean_path(payload.get("path") or "") - move_data = bool(payload.get("move_data")) - recheck = bool(payload.get("recheck", move_data)) - keep_seeding = bool(payload.get("keep_seeding")) - # Note: Automations can force seeding after a physical move even if the torrent was not active before. - if not path: - raise ValueError("Missing path") - results = [] - if move_data: - _rt_execute_allow_timeout(c, "execute.throw", "mkdir", "-p", path) - for h in torrent_hashes: - item = {"hash": h, "path": path, "move_data": move_data, "keep_seeding": keep_seeding} - try: - was_state = int(c.call("d.state", h) or 0) - except Exception: - was_state = 0 - try: - was_active = int(c.call("d.is_active", h) or 0) - except Exception: - was_active = was_state - if move_data: - src = _remote_clean_path(_torrent_data_path(c, h)) - if not src: - raise ValueError(f"Cannot determine source path for {h}") - dst = _remote_join(path, posixpath.basename(src.rstrip("/"))) - if src != dst: - try: - c.call("d.stop", h) - except Exception: - pass - try: - c.call("d.close", h) - except Exception: - pass - _run_remote_move(c, src, dst) - item["moved_from"] = src - item["moved_to"] = dst - else: - item["skipped"] = "source and destination are the same" - c.call("d.directory.set", h, path) - if recheck: - try: - c.call("d.check_hash", h) - except Exception as exc: - item["recheck_error"] = str(exc) - if keep_seeding or was_state or was_active: - try: - c.call("d.start", h) - item["started_after_move"] = True - except Exception as exc: - item["start_after_move_error"] = str(exc) - else: - c.call("d.directory.set", h, path) - results.append(item) - return {"ok": True, "count": len(torrent_hashes), "move_data": move_data, "keep_seeding": keep_seeding, "results": results} - if name == "pause": - # Note: The app pause action is now a pure d.pause so later resume works without stop/start. - results = [pause_hash(c, h) for h in torrent_hashes] - return {"ok": True, "count": len(torrent_hashes), "remove_data": False, "results": results} - if name in {"resume", "unpause"}: - # Note: Resume/Unpause uses only d.resume for Paused state. - results = [resume_paused_hash(c, h) for h in torrent_hashes] - return {"ok": True, "count": len(torrent_hashes), "remove_data": False, "results": results} - if name == "start": - # Note: Start separates Stopped from Paused; paused items go through d.resume, stopped items through d.start. - results = [start_or_resume_hash(c, h) for h in torrent_hashes] - return {"ok": True, "count": len(torrent_hashes), "remove_data": False, "results": results} - - method = methods.get(name) - if not method: - raise ValueError(f"Unknown action: {name}") - remove_data = bool(payload.get("remove_data")) if name == "remove" else False - results = [] - for h in torrent_hashes: - if remove_data: - results.append(_remove_torrent_data(c, h)) - c.call(method, h) - if name == "recheck": - # Note: Recheck is tracked so even very fast checks still receive the after-check start/stop policy. - _mark_post_check_watch(int(profile.get("id") or 0), h) - return {"ok": True, "count": len(torrent_hashes), "remove_data": remove_data, "results": results} - -def add_magnet(profile: dict, uri: str, start: bool = True, directory: str = "", label: str = "") -> dict: - c = client_for(profile) - commands = [] - if directory: - commands.append(f"d.directory.set={directory}") - if label: - commands.append(f"d.custom1.set={label}") - if start: - c.load.start_verbose("", uri, *commands) - else: - c.load.normal("", uri, *commands) - return {"ok": True} - - -def set_limits(profile: dict, down: int | None, up: int | None): - """Set global speed limits in bytes/s. - - rTorrent XML-RPC setters need an empty target string as the first - argument. Without it rTorrent returns: target must be a string. - """ - c = client_for(profile) - if down is not None: - c.call("throttle.global_down.max_rate.set", "", int(down)) - if up is not None: - c.call("throttle.global_up.max_rate.set", "", int(up)) - return {"ok": True, "down": int(down or 0), "up": int(up or 0)} - - -def add_torrent_raw(profile: dict, data: bytes, start: bool = True, directory: str = "", label: str = "", file_priorities: list[dict] | None = None) -> dict: - c = client_for(profile) - commands = [] - if directory: - commands.append(f"d.directory.set={directory}") - if label: - commands.append(f"d.custom1.set={label}") - # Note: File selection before start loads the torrent stopped, changes priorities, then starts it if requested. - method = "load.raw" if file_priorities else ("load.raw_start" if start else "load.raw") - c.call(method, "", Binary(data), *commands) - info_hash = "" - if file_priorities: - try: - from .torrent_meta import parse_torrent - info_hash = parse_torrent(data).get("info_hash") or "" - set_file_priorities(profile, info_hash, file_priorities) - if start: - c.call("d.start", info_hash) - except Exception as exc: - return {"ok": False, "info_hash": info_hash, "error": str(exc)} - return {"ok": True, "info_hash": info_hash}