from __future__ import annotations

import errno
import os
import posixpath
import socket
import time
import uuid
from urllib.parse import urlparse
from xmlrpc.client import Binary, dumps, loads
from pathlib import Path as LocalPath
from ..utils import human_rate, human_size
from ..db import connect, default_user_id, utcnow
from ..config import PYTORRENT_TMP_DIR, REMOTE_READ_CHUNK_BYTES


class ScgiMethod:
    def __init__(self, client: "ScgiRtorrentClient", name: str):
        self.client = client
        self.name = name

    def __getattr__(self, name: str):
        return ScgiMethod(self.client, f"{self.name}.{name}")

    def __call__(self, *args):
        return self.client.call(self.name, *args)


class ScgiRtorrentClient:
    """XML-RPC over SCGI client for rTorrent network.scgi.open_port."""

    def __init__(self, url: str, timeout: int = 5):
        parsed = urlparse(url)
        if parsed.scheme != "scgi":
            raise ValueError("SCGI URL must start with scgi://")
        if not parsed.hostname or not parsed.port:
            raise ValueError("SCGI URL must include host and port, e.g. scgi://127.0.0.1:5000/RPC2")
        self.host = parsed.hostname
        self.port = parsed.port
        self.timeout = timeout
        self.path = parsed.path or "/RPC2"

    def __getattr__(self, name: str):
        return ScgiMethod(self, name)

    def call(self, method_name: str, *args):
        body = dumps(args, methodname=method_name, allow_none=True).encode("utf-8")
        headers = {
            "CONTENT_LENGTH": str(len(body)),
            "SCGI": "1",
            "REQUEST_METHOD": "POST",
            "REQUEST_URI": self.path,
            "SCRIPT_NAME": self.path,
            "SERVER_PROTOCOL": "HTTP/1.1",
            "CONTENT_TYPE": "text/xml",
        }
        header_blob = b"".join(k.encode() + b"\0" + v.encode() + b"\0" for k, v in headers.items())
        payload = str(len(header_blob)).encode("ascii") + b":" + header_blob + b"," + body
        attempts = _scgi_retry_attempts()
        last_exc = None
        for attempt in range(1, attempts + 1):
            try:
                with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock:
                    sock.settimeout(self.timeout)
                    sock.sendall(payload)
                    chunks: list[bytes] = []
                    while True:
                        chunk = sock.recv(65536)
                        if not chunk:
                            break
                        chunks.append(chunk)
                response = b"".join(chunks)
                if not response:
                    raise ConnectionError("Empty response from rTorrent SCGI")
                if b"\r\n\r\n" in response:
                    response = response.split(b"\r\n\r\n", 1)[1]
                elif b"\n\n" in response:
                    response = response.split(b"\n\n", 1)[1]
                result, _ = loads(response)
                return result[0] if len(result) == 1 else result
            except Exception as exc:
                last_exc = exc
                if attempt >= attempts or not _is_transient_scgi_error(exc):
                    raise
                time.sleep(_scgi_retry_delay(attempt))
        raise last_exc or ConnectionError("rTorrent SCGI call failed")


def _scgi_retry_attempts() -> int:
    # Note: Short retry/backoff protects bulk operations from temporary Errno 111 during high rTorrent load.
    try:
        return max(1, min(10, int(os.environ.get("PYTORRENT_SCGI_RETRIES", "5"))))
    except Exception:
        return 5


def _scgi_retry_delay(attempt: int) -> float:
    return min(5.0, 0.35 * (2 ** max(0, attempt - 1)))


def _is_transient_scgi_error(exc: Exception) -> bool:
    # Note: Retry covers common temporary SCGI/socket errors but does not hide semantic XML-RPC errors.
    if isinstance(exc, (ConnectionRefusedError, ConnectionResetError, TimeoutError, socket.timeout)):
        return True
    err_no = getattr(exc, "errno", None)
    if err_no in {errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT, errno.EHOSTUNREACH, errno.ENETUNREACH}:
        return True
    msg = str(exc).lower()
    return any(text in msg for text in ("connection refused", "connection reset", "timed out", "timeout", "empty response", "pipe creation failed", "resource temporarily unavailable", "try again", "temporarily unavailable"))


def client_for(profile: dict) -> ScgiRtorrentClient:
    return ScgiRtorrentClient(profile["scgi_url"], int(profile.get("timeout_seconds") or 5))


_UNSUPPORTED_EXEC_METHODS: set[str] = set()
_EXEC_TARGET_STYLE: dict[str, int] = {}

def _rt_execute_preview(method_name: str, call_args: tuple) -> str:
    # Note: The compact RPC summary removes long scripts from error messages while keeping the method and first arguments for diagnostics.
    preview = ", ".join(repr(x) for x in call_args[:3])
    if len(call_args) > 3:
        preview += ", ..."
    return f"{method_name}({preview})"


def _rt_execute_target_variants(method: str, args: tuple) -> list[tuple]:
    # Note: Depending on version, rTorrent XML-RPC either requires or rejects an empty target; cache the working variant per method.
    variants = [("", *args), args]
    preferred = _EXEC_TARGET_STYLE.get(method)
    if preferred is not None and 0 <= preferred < len(variants):
        return [variants[preferred]] + [v for i, v in enumerate(variants) if i != preferred]
    return variants


def _is_rt_method_missing(exc: Exception) -> bool:
    msg = str(exc).lower()
    return "not defined" in msg or "no such method" in msg or "unknown method" in msg


def _rt_execute_methods(method: str) -> list[str]:
    # Note: execute2.* is tried only when the base execute.* method does not exist to avoid false retry errors.
    methods = [method]
    if method.startswith("execute."):
        fallback = method.replace("execute.", "execute2.", 1)
        if fallback not in _UNSUPPORTED_EXEC_METHODS:
            methods.append(fallback)
    return methods


def _rt_execute(c: ScgiRtorrentClient, method: str, *args):
    """Run rTorrent execute.* as the rTorrent user across XML-RPC variants."""
    errors: list[str] = []
    attempts = _scgi_retry_attempts()
    for attempt in range(1, attempts + 1):
        errors.clear()
        transient_seen = False
        primary_missing = False
        for method_index, method_name in enumerate(_rt_execute_methods(method)):
            if method_name in _UNSUPPORTED_EXEC_METHODS:
                continue
            if method_index > 0 and not primary_missing:
                continue
            for call_args in _rt_execute_target_variants(method_name, args):
                try:
                    result = c.call(method_name, *call_args)
                    if method_name == method:
                        _EXEC_TARGET_STYLE[method_name] = 0 if call_args and call_args[0] == "" else 1
                    return result
                except Exception as exc:
                    if _is_rt_method_missing(exc):
                        _UNSUPPORTED_EXEC_METHODS.add(method_name)
                        if method_name == method:
                            primary_missing = True
                        errors.append(f"{method_name}: method not defined")
                        break
                    transient_seen = transient_seen or _is_transient_scgi_error(exc)
                    errors.append(f"{_rt_execute_preview(method_name, call_args)}: {exc}")
        if transient_seen and attempt < attempts:
            time.sleep(_scgi_retry_delay(attempt))
            continue
        break
    raise RuntimeError("rTorrent execute failed: " + "; ".join(errors))


def _is_rt_timeout_error(exc: Exception) -> bool:
    msg = str(exc).lower()
    return isinstance(exc, (TimeoutError, socket.timeout)) or "timed out" in msg or "timeout" in msg


def _rt_execute_allow_timeout(c: ScgiRtorrentClient, method: str, *args):
    try:
        return _rt_execute(c, method, *args)
    except Exception as exc:
        if _is_rt_timeout_error(exc):
            return None
        raise


def _remote_clean_path(path: str) -> str:
    path = str(path or "").strip()
    return posixpath.normpath(path) if path else path


def _remote_join(*parts: str) -> str:
    cleaned = [str(p).strip().rstrip("/") for p in parts if str(p).strip()]
    return posixpath.normpath(posixpath.join(*cleaned)) if cleaned else ""


def _run_remote_move(c: ScgiRtorrentClient, src: str, dst: str, poll_interval: float = 2.0) -> None:
    """Run a remote mv without binding the transfer time to the SCGI timeout."""
    token = uuid.uuid4().hex
    status_path = f"/tmp/pytorrent-move-{token}.status"
    start_script = (
        'src=$1; dst=$2; status=$3; tmp=${status}.tmp; '
        'rm -f "$status" "$tmp"; '
        '( '
        'rc=0; '
        'parent=${dst%/*}; '
        'if [ -z "$dst" ] || [ "$dst" = "/" ]; then echo "unsafe destination: $dst" >&2; rc=5; fi; '
        'if [ $rc -eq 0 ] && [ -n "$parent" ] && [ "$parent" != "$dst" ]; then mkdir -p "$parent" || rc=$?; fi; '
        'if [ $rc -eq 0 ] && [ "$src" = "$dst" ]; then :; '
        'elif [ $rc -eq 0 ] && { [ -e "$dst" ] || [ -L "$dst" ]; } && [ ! -e "$src" ] && [ ! -L "$src" ]; then :; '
        'elif [ $rc -eq 0 ] && [ ! -e "$src" ] && [ ! -L "$src" ]; then echo "source missing: $src" >&2; rc=3; '
        'elif [ $rc -eq 0 ] && { [ -e "$dst" ] || [ -L "$dst" ]; }; then rm -rf -- "$dst" && mv -f -- "$src" "$dst" || rc=$?; '
        'elif [ $rc -eq 0 ]; then mv -f -- "$src" "$dst" || rc=$?; '
        'fi; '
        'if [ $rc -eq 0 ]; then printf "OK\n" > "$status"; '
        'else printf "ERR %s\n" "$rc" > "$status"; fi; '
        'if [ -s "$tmp" ]; then cat "$tmp" >> "$status"; fi; '
        'rm -f "$tmp" '
        ') > "$tmp" 2>&1 &'
    )
    poll_script = 'status=$1; [ -f "$status" ] && cat "$status" || true'
    cleanup_script = 'rm -f "$1"'

    _rt_execute_allow_timeout(c, "execute.throw", "sh", "-c", start_script, "pytorrent-move-start", src, dst, status_path)

    while True:
        time.sleep(max(0.25, poll_interval))
        try:
            output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-move-poll", status_path) or "").strip()
        except Exception as exc:
            # Note: During bulk moves, rTorrent may briefly not create the execute.capture pipe; polling waits and retries.
            if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc):
                continue
            raise
        if not output:
            continue
        try:
            _rt_execute(c, "execute.throw", "sh", "-c", cleanup_script, "pytorrent-move-clean", status_path)
        except Exception:
            pass
        first_line = output.splitlines()[0].strip()
        if first_line == "OK":
            return
        if first_line.startswith("ERR"):
            details = "\n".join(output.splitlines()[1:]).strip()
            raise RuntimeError(details or first_line)
        raise RuntimeError(output)


def _torrent_data_path(c: ScgiRtorrentClient, torrent_hash: str) -> str:
    """Return data path as rTorrent sees it; do not touch pyTorrent local FS."""
    try:
        src = str(c.call("d.base_path", torrent_hash) or "").strip()
        if src:
            return src
    except Exception:
        pass
    directory = str(c.call("d.directory", torrent_hash) or "").strip()
    name = str(c.call("d.name", torrent_hash) or "").strip()
    try:
        is_multi = int(c.call("d.is_multi_file", torrent_hash) or 0)
    except Exception:
        is_multi = 0
    if is_multi:
        return directory
    if directory and name:
        return _remote_join(directory, name)
    return directory


def _safe_rm_rf_path(path: str) -> str:
    path = _remote_clean_path(path)
    if not path or path in {"/", "."}:
        raise ValueError("Refusing to remove an unsafe data path")
    if path.rstrip("/").count("/") < 1:
        raise ValueError(f"Refusing to remove an unsafe data path: {path}")
    return path


def _run_remote_rm(c: ScgiRtorrentClient, path: str, poll_interval: float = 2.0) -> None:
    # Note: rm -rf runs in the background on the rTorrent side, so long deletes do not hold a single SCGI connection.
    token = uuid.uuid4().hex
    status_path = f"/tmp/pytorrent-rm-{token}.status"
    script = (
        'target=$1; status=$2; tmp=${status}.tmp; '
        'rm -f "$status" "$tmp"; '
        '( rc=0; '
        'if [ -z "$target" ] || [ "$target" = "/" ] || [ "$target" = "." ]; then echo "unsafe remove target: $target" >&2; rc=5; '
        'else rm -rf -- "$target" || rc=$?; fi; '
        'if [ $rc -eq 0 ]; then printf "OK\n" > "$status"; else printf "ERR %s\n" "$rc" > "$status"; fi; '
        'if [ -s "$tmp" ]; then cat "$tmp" >> "$status"; fi; '
        'rm -f "$tmp" ) > "$tmp" 2>&1 &'
    )
    poll_script = 'status=$1; [ -f "$status" ] && cat "$status" || true'
    cleanup_script = 'rm -f "$1"'
    _rt_execute_allow_timeout(c, "execute.throw", "sh", "-c", script, "pytorrent-rm-start", path, status_path)
    while True:
        time.sleep(max(0.25, poll_interval))
        try:
            output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-rm-poll", status_path) or "").strip()
        except Exception as exc:
            # Note: Remove uses the same safe polling as move, so a temporary missing pipe does not fail the whole queue.
            if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc):
                continue
            raise
        if not output:
            continue
        try:
            _rt_execute(c, "execute.throw", "sh", "-c", cleanup_script, "pytorrent-rm-clean", status_path)
        except Exception:
            pass
        first_line = output.splitlines()[0].strip()
        if first_line == "OK":
            return
        if first_line.startswith("ERR"):
            details = "\n".join(output.splitlines()[1:]).strip()
            raise RuntimeError(details or first_line)
        raise RuntimeError(output)


def _remove_torrent_data(c: ScgiRtorrentClient, torrent_hash: str) -> dict:
    data_path = _safe_rm_rf_path(_torrent_data_path(c, torrent_hash))
    try:
        c.call("d.stop", torrent_hash)
    except Exception:
        pass
    try:
        c.call("d.close", torrent_hash)
    except Exception:
        pass
    _run_remote_rm(c, data_path)
    return {"hash": torrent_hash, "removed_path": data_path}


def browse_path(profile: dict, path: str | None = None) -> dict:
    """List directories through rTorrent execute.capture to avoid pyTorrent FS permissions."""
    c = client_for(profile)
    base = _remote_clean_path(path or default_download_path(profile))
    script = (
        'base=$1; '
        '[ -d "$base" ] || exit 2; '
        'for p in "$base"/* "$base"/.[!.]* "$base"/..?*; do '
        '[ -d "$p" ] || continue; '
        'name=${p##*/}; '
        'printf "%s\t%s\n" "$name" "$p"; '
        'done'
    )
    output = _rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-browse", base)
    dirs = []
    for line in str(output or "").splitlines():
        if "\t" not in line:
            continue
        name, full_path = line.split("\t", 1)
        if name not in {".", ".."}:
            dirs.append({"name": name, "path": full_path})
    dirs.sort(key=lambda x: x["name"].lower())
    parent = posixpath.dirname(base.rstrip("/")) or "/"
    if parent == base:
        parent = base
    return {"path": base, "parent": parent, "dirs": dirs[:300], "source": "rtorrent"}


POST_CHECK_DOWNLOAD_LABEL = "To download after check"
_POST_CHECK_WATCH_TTL_SECONDS = 48 * 60 * 60
_POST_CHECK_WATCH_MIN_SECONDS = 2.0
_POST_CHECK_WATCH: dict[int, dict[str, float]] = {}


def _mark_post_check_watch(profile_id: int, torrent_hash: str) -> None:
    if not torrent_hash:
        return
    _POST_CHECK_WATCH.setdefault(int(profile_id), {})[str(torrent_hash)] = time.time()


def _clear_post_check_watch(profile_id: int, torrent_hash: str) -> None:
    profile_watch = _POST_CHECK_WATCH.get(int(profile_id))
    if not profile_watch:
        return
    profile_watch.pop(str(torrent_hash), None)
    if not profile_watch:
        _POST_CHECK_WATCH.pop(int(profile_id), None)


def _is_post_check_watched(profile_id: int, torrent_hash: str) -> bool:
    profile_watch = _POST_CHECK_WATCH.get(int(profile_id)) or {}
    started_at = profile_watch.get(str(torrent_hash))
    if not started_at:
        return False
    age = time.time() - started_at
    if age > _POST_CHECK_WATCH_TTL_SECONDS:
        _clear_post_check_watch(profile_id, torrent_hash)
        return False
    # Note: A short grace period prevents labeling a recheck that was queued but has not visibly entered hashing yet.
    return age >= _POST_CHECK_WATCH_MIN_SECONDS


def _label_names(value: str) -> list[str]:
    names: list[str] = []
    for part in str(value or "").replace(";", ",").replace("|", ",").split(","):
        label = part.strip()
        if label and label not in names:
            names.append(label)
    return names


def _label_value(labels: list[str]) -> str:
    return ", ".join([label for label in labels if str(label or "").strip()])


def _without_post_check_download_label(value: str | None) -> str:
    return _label_value([label for label in _label_names(str(value or "")) if label != POST_CHECK_DOWNLOAD_LABEL])


def clear_post_check_download_label(c: ScgiRtorrentClient, torrent_hash: str, current_label: str | None = None) -> bool:
    label_source = current_label
    if label_source is None:
        try:
            label_source = str(c.call("d.custom1", str(torrent_hash or "")) or "")
        except Exception:
            label_source = ""
    labels = _label_names(str(label_source or ""))
    if POST_CHECK_DOWNLOAD_LABEL not in labels:
        return False
    # Note: The temporary post-check label is removed only after the torrent leaves the stopped waiting queue.
    c.call("d.custom1.set", str(torrent_hash or ""), _label_value([label for label in labels if label != POST_CHECK_DOWNLOAD_LABEL]))
    return True


def _message_indicates_active_check(message: str) -> bool:
    msg = str(message or "").lower()
    if not msg:
        return False
    finished_markers = ("complete", "completed", "finished", "success", "succeeded", "failed", "done")
    if any(marker in msg for marker in finished_markers):
        return False
    active_markers = ("checking", "hashing", "hash check queued", "hash check scheduled", "check hash queued", "recheck queued", "rechecking")
    return any(marker in msg for marker in active_markers)


def _row_progress_complete(row: dict) -> bool:
    size = int(row.get("size") or 0)
    completed = int(row.get("completed_bytes") or 0)
    return bool(row.get("complete")) or (size > 0 and completed >= size) or float(row.get("progress") or 0) >= 100.0


def _cleanup_post_check_label_if_ready(c: ScgiRtorrentClient, row: dict) -> bool:
    labels = _label_names(str(row.get("label") or ""))
    if POST_CHECK_DOWNLOAD_LABEL not in labels:
        return False
    status = str(row.get("status") or "").lower()
    started_after_wait = bool(int(row.get("state") or 0)) and status != "checking"
    if not (_row_progress_complete(row) or status == "seeding" or started_after_wait):
        return False
    # Note: Keep the post-check label while the torrent is stopped; remove it once it is started for download/seeding.
    clear_post_check_download_label(c, str(row.get("hash") or ""), str(row.get("label") or ""))
    row["label"] = _without_post_check_download_label(str(row.get("label") or ""))
    return True


def apply_post_check_policy(profile: dict, rows: list[dict], previous_rows: dict[str, dict] | None = None) -> list[dict]:
    """Start complete torrents after check; stop and label incomplete ones for Smart Queue."""
    previous_rows = previous_rows or {}
    profile_id = int(profile.get("id") or 0)
    c = client_for(profile)
    changes: list[dict] = []
    for row in rows:
        h = str(row.get("hash") or "")
        prev = previous_rows.get(h) or {}
        try:
            if h and _cleanup_post_check_label_if_ready(c, row):
                changes.append({"hash": h, "action": "remove_post_check_label"})
        except Exception as exc:
            changes.append({"hash": h, "action": "remove_post_check_label_failed", "error": str(exc)})
        was_checking = str(prev.get("status") or "") == "Checking" or int(prev.get("hashing") or 0) > 0
        watched_recheck = _is_post_check_watched(profile_id, h)
        is_checking = str(row.get("status") or "") == "Checking" or int(row.get("hashing") or 0) > 0
        if not h or not (was_checking or watched_recheck) or is_checking:
            continue
        complete = _row_progress_complete(row)
        try:
            if complete:
                # Note: A fully checked torrent is started with the same helper as the manual Start action so it seeds immediately.
                start_result = start_or_resume_hash(c, h)
                clear_post_check_download_label(c, h, str(row.get("label") or ""))
                row.update({"state": 1, "active": 1, "paused": False, "status": "Seeding", "label": _without_post_check_download_label(str(row.get("label") or ""))})
                changes.append({"hash": h, "action": "start_seed_after_check", "complete": True, "result": start_result})
            else:
                labels = _label_names(str(row.get("label") or ""))
                if POST_CHECK_DOWNLOAD_LABEL not in labels:
                    labels.append(POST_CHECK_DOWNLOAD_LABEL)
                label_value = _label_value(labels)
                # Note: Incomplete torrents are left stopped after check so Smart Queue can start them later within the global limit.
                c.call("d.stop", h)
                try:
                    c.call("d.close", h)
                except Exception:
                    pass
                c.call("d.custom1.set", h, label_value)
                row.update({"state": 0, "active": 0, "paused": False, "status": "Stopped", "label": label_value})
                changes.append({"hash": h, "action": "stop_and_label_after_check", "complete": False, "label": POST_CHECK_DOWNLOAD_LABEL})
            _clear_post_check_watch(profile_id, h)
        except Exception as exc:
            changes.append({"hash": h, "action": "post_check_policy_failed", "error": str(exc)})
    return changes


TORRENT_FIELDS = [
    "d.hash=", "d.name=", "d.state=", "d.complete=", "d.size_bytes=", "d.completed_bytes=",
    "d.ratio=", "d.up.rate=", "d.down.rate=", "d.up.total=", "d.down.total=", "d.peers_connected=",
    "d.peers_complete=", "d.priority=", "d.directory=", "d.base_path=", "d.creation_date=", "d.custom1=",
    "d.custom=py_ratio_group", "d.message=", "d.hashing=", "d.is_active=", "d.is_multi_file=",
]

TORRENT_OPTIONAL_FIELDS = [
    "d.timestamp.finished=",
]


def human_duration(seconds: int) -> str:
    # Note: Download ETA is derived locally from remaining bytes and current download speed.
    seconds = max(0, int(seconds or 0))
    if seconds <= 0:
        return '-'
    days, rem = divmod(seconds, 86400)
    hours, rem = divmod(rem, 3600)
    minutes, _ = divmod(rem, 60)
    if days:
        return f"{days}d {hours}h"
    if hours:
        return f"{hours}h {minutes}m"
    return f"{minutes}m"


def normalize_row(row: list) -> dict:
    size = int(row[4] or 0)
    completed = int(row[5] or 0)
    progress = 100.0 if size <= 0 and int(row[3] or 0) else round((completed / size) * 100, 2) if size else 0.0
    ratio_raw = int(row[6] or 0)
    down_rate = int(row[8] or 0)
    up_rate = int(row[7] or 0)
    remaining_bytes = max(0, size - completed)
    eta_seconds = int(remaining_bytes / down_rate) if down_rate > 0 and not int(row[3] or 0) else 0
    directory = str(row[14] or "")
    base_path = str(row[15] or "")
    is_multi_file = int(row[22] or 0) if len(row) > 22 else 0
    completed_at = int(row[23] or 0) if len(row) > 23 else 0

    # Show the selected download location only. Hide the torrent root
    # directory for multi-file torrents and the filename for single-file
    # torrents. Data deletion still uses the full d.base_path elsewhere.
    if base_path and base_path != "/":
        display_parent = posixpath.dirname(base_path.rstrip("/")) or "/"
        display_path = display_parent.rstrip("/") + "/" if display_parent != "/" else display_parent
    elif directory and is_multi_file and directory != "/":
        display_parent = posixpath.dirname(directory.rstrip("/")) or "/"
        display_path = display_parent.rstrip("/") + "/" if display_parent != "/" else display_parent
    elif directory:
        display_path = directory.rstrip("/") + "/" if directory != "/" else directory
    else:
        display_path = ""
    msg = str(row[19] or "")
    msg_l = msg.lower()
    hashing = int(row[20] or 0) if len(row) > 20 else 0
    is_active = int(row[21] or 0) if len(row) > 21 else int(row[2] or 0)
    state = int(row[2] or 0)
    complete = int(row[3] or 0)
    # Note: d.hashing is authoritative; stale "hash check complete" messages must not keep the UI in Checking forever.
    is_checking = bool(hashing) or _message_indicates_active_check(msg_l)
    is_paused = bool(state) and not bool(is_active) and not is_checking
    status = "Checking" if is_checking else "Paused" if is_paused else "Seeding" if complete and state else "Downloading" if state else "Stopped"
    return {
        "hash": str(row[0] or ""),
        "name": str(row[1] or ""),
        "state": state,
        "active": is_active,
        "paused": is_paused,
        "complete": complete,
        "size": size,
        "size_h": human_size(size),
        "completed_bytes": completed,
        "progress": progress,
        "ratio": round(ratio_raw / 1000, 3),
        "up_rate": up_rate,
        "up_rate_h": human_rate(up_rate),
        "down_rate": down_rate,
        "down_rate_h": human_rate(down_rate),
        "eta_seconds": eta_seconds,
        "eta_h": human_duration(eta_seconds) if eta_seconds else "-",
        "up_total": int(row[9] or 0),
        "up_total_h": human_size(row[9] or 0),
        "down_total": int(row[10] or 0),
        "down_total_h": human_size(row[10] or 0),
        "peers": int(row[11] or 0),
        "seeds": int(row[12] or 0),
        "priority": int(row[13] or 0),
        "path": display_path,
        "created": int(row[16] or 0),
        "completed_at": completed_at,
        "label": str(row[17] or ""),
        "ratio_group": str(row[18] or ""),
        "message": msg,
        "status": status,
        "hashing": hashing,
    }


def list_torrents(profile: dict) -> list[dict]:
    c = client_for(profile)
    try:
        rows = c.d.multicall2("", "main", *(TORRENT_FIELDS + TORRENT_OPTIONAL_FIELDS))
    except Exception:
        # Keep compatibility with older rTorrent builds that do not expose optional timestamp fields.
        rows = c.d.multicall2("", "main", *TORRENT_FIELDS)
    return [normalize_row(list(row)) for row in rows]


_DISK_USAGE_CACHE: dict[str, tuple[float, dict]] = {}
_DISK_USAGE_TTL_SECONDS = 30.0
_REMOTE_USAGE_CACHE: dict[int, tuple[float, dict]] = {}
_REMOTE_USAGE_TTL_SECONDS = 60.0
_REMOTE_PUBLIC_IP_CACHE: dict[int, tuple[float, str]] = {}
_REMOTE_PUBLIC_IP_TTL_SECONDS = 6 * 60 * 60.0


def remote_public_ip(profile: dict, force: bool = False) -> str:
    profile_id = int(profile.get("id") or 0)
    now = time.monotonic()
    cached = _REMOTE_PUBLIC_IP_CACHE.get(profile_id)
    if cached and not force and now - cached[0] < _REMOTE_PUBLIC_IP_TTL_SECONDS:
        return cached[1]
    script = (
        'for url in https://ifconfig.co https://ifconfig.me https://ipapi.linuxiarz.pl http://ifconfig.co http://ifconfig.me; do '
        'ip=$(curl -fsS --max-time 8 "$url" 2>/dev/null | tr -d "\r" | head -n 1 | sed "s/[^0-9a-fA-F:.]//g"); '
        'if [ -n "$ip" ]; then printf "%s" "$ip"; exit 0; fi; '
        'done; exit 1'
    )
    value = str(_rt_execute(client_for(profile), "execute.capture", "sh", "-c", script) or "").strip()
    if not value:
        raise RuntimeError("Cannot read remote public IP")
    _REMOTE_PUBLIC_IP_CACHE[profile_id] = (now, value)
    return value


def remote_system_usage(profile: dict, force: bool = False) -> dict:
    profile_id = int(profile.get("id") or 0)
    now = time.monotonic()
    cached = _REMOTE_USAGE_CACHE.get(profile_id)
    if cached and not force and now - cached[0] < _REMOTE_USAGE_TTL_SECONDS:
        usage = dict(cached[1])
        usage["cached"] = True
        return usage
    script = (
        'read cpu user nice system idle iowait irq softirq steal guest guest_nice < /proc/stat; '
        'total1=$((user+nice+system+idle+iowait+irq+softirq+steal)); idle1=$((idle+iowait)); '
        'sleep 1; '
        'read cpu user nice system idle iowait irq softirq steal guest guest_nice < /proc/stat; '
        'total2=$((user+nice+system+idle+iowait+irq+softirq+steal)); idle2=$((idle+iowait)); '
        'dt=$((total2-total1)); di=$((idle2-idle1)); '
        'cpu_pct=$(awk -v dt="$dt" -v di="$di" "BEGIN { if (dt > 0) printf \"%.1f\", (dt-di)*100/dt; else printf \"0.0\" }"); '
        "mem_total=$(awk '/^MemTotal:/ {print $2}' /proc/meminfo); "
        "mem_avail=$(awk '/^MemAvailable:/ {print $2}' /proc/meminfo); "
        'ram_pct=$(awk -v t="$mem_total" -v a="$mem_avail" "BEGIN { if (t > 0) printf \"%.1f\", (t-a)*100/t; else printf \"0.0\" }"); '
        'printf "%s %s" "$cpu_pct" "$ram_pct"'
    )
    output = str(_rt_execute(client_for(profile), "execute.capture", "sh", "-c", script) or "").strip()
    parts = output.split()
    if len(parts) < 2:
        raise RuntimeError(f"Cannot read remote CPU/RAM usage: {output}")
    usage = {"cpu": float(parts[0]), "ram": float(parts[1]), "source": "rtorrent-remote", "usage_source": "rtorrent-remote", "cached": False}
    _REMOTE_USAGE_CACHE[profile_id] = (now, usage)
    return dict(usage)


def _usage_dict(total: int, used: int, free: int) -> dict:
    total = max(0, int(total or 0))
    used = max(0, int(used or 0))
    free = max(0, int(free or 0))
    pct = round((used / total) * 100, 1) if total else 0.0
    return {
        "ok": True,
        "total": total,
        "used": used,
        "free": free,
        "total_h": human_size(total),
        "used_h": human_size(used),
        "free_h": human_size(free),
        "percent": pct,
    }


def _statvfs_usage(path: str) -> dict:
    stat = os.statvfs(path)
    total = int(stat.f_blocks * stat.f_frsize)
    free = int(stat.f_bavail * stat.f_frsize)
    used = max(0, total - free)
    return _usage_dict(total, used, free)


def _remote_df_usage(profile: dict, path: str) -> dict:
    # Note: Disk paths belong to the rTorrent host. Query df through rTorrent so NFS/Btrfs mounts are measured correctly.
    clean_path = _remote_clean_path(path or os.sep)
    cache_key = f"remote-df:{profile.get('id')}:{clean_path}"
    now = time.monotonic()
    cached = _DISK_USAGE_CACHE.get(cache_key)
    if cached and now - cached[0] < _DISK_USAGE_TTL_SECONDS:
        return dict(cached[1])
    script = (
        'path=$1; '
        'if [ ! -e "$path" ]; then echo "ERR\tmissing path"; exit 0; fi; '
        'line=$(df -Pk "$path" 2>/dev/null | tail -n 1); '
        'if [ -z "$line" ]; then echo "ERR\tdf failed"; exit 0; fi; '
        'set -- $line; pct=${5%\\%}; '
        'if [ -z "$2" ] || [ -z "$3" ] || [ -z "$4" ]; then echo "ERR\tdf parse failed"; exit 0; fi; '
        'printf "OK\t%s\t%s\t%s\t%s\t%s\n" "$2" "$3" "$4" "$pct" "$6"'
    )
    output = str(_rt_execute(client_for(profile), "execute.capture", "sh", "-c", script, "pytorrent-df", clean_path) or "").strip()
    first_line = output.splitlines()[0] if output else ""
    parts = first_line.split("\t")
    if len(parts) >= 6 and parts[0] == "OK":
        total = int(parts[1]) * 1024
        used = int(parts[2]) * 1024
        free = int(parts[3]) * 1024
        usage = _usage_dict(total, used, free)
        usage.update({"path": clean_path, "source_path": parts[5] or clean_path, "fallback": False, "measure_source": "rtorrent-df"})
    else:
        error = parts[1] if len(parts) > 1 else (output or "df returned no data")
        usage = {"ok": False, "path": clean_path, "source_path": clean_path, "error": error, "percent": 0, "measure_source": "rtorrent-df"}
    _DISK_USAGE_CACHE[cache_key] = (now, dict(usage))
    return usage


def _disk_usage_for_path(profile: dict, path: str, allow_parent_fallback: bool = False) -> dict:
    clean_path = _remote_clean_path(path or os.sep)
    try:
        return _remote_df_usage(profile, clean_path)
    except Exception as remote_exc:
        try:
            usage = _statvfs_usage(clean_path)
            usage.update({"path": clean_path, "source_path": clean_path, "fallback": False, "measure_source": "local-statvfs", "warning": str(remote_exc)})
            return usage
        except Exception as first_exc:
            usage = {"ok": False, "path": clean_path, "source_path": clean_path, "error": str(first_exc), "warning": str(remote_exc), "percent": 0}
            if not allow_parent_fallback:
                return usage
            probe = os.path.abspath(clean_path or os.sep)
            seen = set()
            while probe and probe not in seen:
                seen.add(probe)
                parent = os.path.dirname(probe)
                if parent == probe:
                    break
                probe = parent
                try:
                    usage = _statvfs_usage(probe)
                    usage.update({"path": clean_path, "source_path": probe, "fallback": True, "measure_source": "local-statvfs", "warning": str(first_exc)})
                    break
                except Exception:
                    continue
            return usage


def disk_usage_for_default_path(profile: dict) -> dict:
    """Filesystem usage for the rTorrent default download directory."""
    path = default_download_path(profile)
    cache_key = f"default-disk:{profile.get('id')}:{path}"
    now = time.monotonic()
    cached = _DISK_USAGE_CACHE.get(cache_key)
    if cached and now - cached[0] < _DISK_USAGE_TTL_SECONDS:
        return dict(cached[1])
    usage = _disk_usage_for_path(profile, path, allow_parent_fallback=True)
    _DISK_USAGE_CACHE[cache_key] = (now, dict(usage))
    return usage


def disk_usage_for_paths(profile: dict, paths: list[str] | None = None, mode: str = 'default', selected_path: str = '') -> dict:
    # Note: Aggregate/selected modes measure exact user paths on the rTorrent host; they do not fall back to parent/root partitions.
    default_path = default_download_path(profile)
    mode = mode if mode in {'default', 'selected', 'aggregate'} else 'default'
    user_paths: list[str] = []
    for item in paths or []:
        path = _remote_clean_path(str(item or '').strip())
        if path and path not in user_paths:
            user_paths.append(path)
    selected_path = _remote_clean_path(str(selected_path or '').strip())
    if mode == 'selected':
        source_paths = [selected_path] if selected_path else list(user_paths)
    elif mode == 'aggregate':
        source_paths = list(user_paths)
    else:
        source_paths = [default_path]
    if mode in {'selected', 'aggregate'} and not source_paths:
        source_paths = [default_path]
    clean_paths: list[str] = []
    for item in source_paths:
        path = _remote_clean_path(str(item or '').strip())
        if path and path not in clean_paths:
            clean_paths.append(path)
    entries = [_disk_usage_for_path(profile, path, allow_parent_fallback=(mode == 'default')) for path in clean_paths]
    chosen = entries[0] if entries else _disk_usage_for_path(profile, default_path, allow_parent_fallback=True)
    if mode == 'selected' and selected_path:
        chosen = next((x for x in entries if x.get('path') == selected_path), chosen)
    elif mode == 'aggregate':
        ok_entries = [x for x in entries if x.get('ok')]
        total = sum(int(x.get('total') or 0) for x in ok_entries)
        used = sum(int(x.get('used') or 0) for x in ok_entries)
        free = sum(int(x.get('free') or 0) for x in ok_entries)
        chosen = _usage_dict(total, used, free) if ok_entries else {"ok": False, "total": 0, "used": 0, "free": 0, "total_h": "0 B", "used_h": "0 B", "free_h": "0 B", "percent": 0}
        chosen.update({'path': 'aggregate', 'source_path': 'aggregate', 'fallback': False, 'measure_source': 'rtorrent-df'})
    chosen = dict(chosen)
    chosen['mode'] = mode
    chosen['paths'] = entries
    return chosen


def _safe_rtorrent_int(callable_obj, default=None):
    """Return an integer rTorrent metric without failing the whole status poll."""
    try:
        value = callable_obj()
        return int(value)
    except Exception:
        return default


def _safe_rtorrent_time(c):
    """Read rTorrent server time when supported; otherwise let the browser clock remain authoritative."""
    candidates = (
        lambda: c.system.time_seconds(),
        lambda: c.system.time(),
    )
    for candidate in candidates:
        value = _safe_rtorrent_int(candidate)
        if value:
            return value
    return None

def system_status(profile: dict) -> dict:
    c = client_for(profile)
    version = str(c.system.client_version())
    try:
        down_limit = int(c.throttle.global_down.max_rate())
    except Exception:
        down_limit = 0
    try:
        up_limit = int(c.throttle.global_up.max_rate())
    except Exception:
        up_limit = 0
    rows = list_torrents(profile)
    # Note: ruTorrent-style footer metrics. Missing XMLRPC methods are shown as unavailable instead of breaking polling.
    open_sockets = _safe_rtorrent_int(lambda: c.network.open_sockets())
    max_open_sockets = _safe_rtorrent_int(lambda: c.network.max_open_sockets())
    rtorrent_time = _safe_rtorrent_time(c)
    checking_count = sum(1 for t in rows if t.get("status") == "Checking" or int(t.get("hashing") or 0) > 0)
    return {
        "ok": True,
        "version": version,
        "total": len(rows),
        "active": sum(1 for t in rows if t["state"]),
        "seeding": sum(1 for t in rows if t["complete"] and t["state"] and not t.get("paused")),
        "leeching": sum(1 for t in rows if not t["complete"] and t["state"] and not t.get("paused") and t.get("status") != "Checking"),
        "checking": checking_count,
        "paused": sum(1 for t in rows if t.get("paused")),
        "stopped": sum(1 for t in rows if not t["state"]),
        "down_rate": sum(t["down_rate"] for t in rows),
        "down_rate_h": human_rate(sum(t["down_rate"] for t in rows)),
        "up_rate": sum(t["up_rate"] for t in rows),
        "up_rate_h": human_rate(sum(t["up_rate"] for t in rows)),
        "down_limit": down_limit,
        "up_limit": up_limit,
        "down_limit_h": human_rate(down_limit) if down_limit else "∞",
        "up_limit_h": human_rate(up_limit) if up_limit else "∞",
        "total_down": sum(t["down_total"] for t in rows),
        "total_up": sum(t["up_total"] for t in rows),
        "total_down_h": human_size(sum(t["down_total"] for t in rows)),
        "total_up_h": human_size(sum(t["up_total"] for t in rows)),
        "open_sockets": open_sockets,
        "max_open_sockets": max_open_sockets,
        "rtorrent_time": rtorrent_time,
        "disk": disk_usage_for_default_path(profile),
    }


def scgi_diagnostics(profile: dict) -> dict:
    c = client_for(profile)
    started = time.perf_counter()
    body = dumps((), methodname="system.client_version", allow_none=True).encode("utf-8")
    headers = {
        "CONTENT_LENGTH": str(len(body)),
        "SCGI": "1",
        "REQUEST_METHOD": "POST",
        "REQUEST_URI": c.path,
        "SCRIPT_NAME": c.path,
        "SERVER_PROTOCOL": "HTTP/1.1",
        "CONTENT_TYPE": "text/xml",
    }
    header_blob = b"".join(k.encode() + b"\0" + v.encode() + b"\0" for k, v in headers.items())
    payload = str(len(header_blob)).encode("ascii") + b":" + header_blob + b"," + body
    metrics = {
        "url": profile.get("scgi_url"),
        "host": c.host,
        "port": c.port,
        "path": c.path,
        "timeout_seconds": c.timeout,
        "request_bytes": len(payload),
    }
    connect_started = time.perf_counter()
    with socket.create_connection((c.host, c.port), timeout=c.timeout) as sock:
        sock.settimeout(c.timeout)
        metrics["connect_ms"] = round((time.perf_counter() - connect_started) * 1000, 2)
        send_started = time.perf_counter()
        sock.sendall(payload)
        metrics["send_ms"] = round((time.perf_counter() - send_started) * 1000, 2)
        chunks: list[bytes] = []
        first_byte_at = None
        while True:
            chunk = sock.recv(65536)
            if chunk and first_byte_at is None:
                first_byte_at = time.perf_counter()
            if not chunk:
                break
            chunks.append(chunk)
    response = b"".join(chunks)
    metrics["response_bytes"] = len(response)
    metrics["first_byte_ms"] = round(((first_byte_at or time.perf_counter()) - started) * 1000, 2)
    metrics["total_ms"] = round((time.perf_counter() - started) * 1000, 2)
    if not response:
        raise ConnectionError("Empty response from rTorrent SCGI")
    xml_response = response
    if b"\r\n\r\n" in xml_response:
        xml_response = xml_response.split(b"\r\n\r\n", 1)[1]
    elif b"\n\n" in xml_response:
        xml_response = xml_response.split(b"\n\n", 1)[1]
    result, _ = loads(xml_response)
    metrics["xml_bytes"] = len(xml_response)
    metrics["client_version"] = str(result[0]) if result else ""
    metrics["ok"] = True
    return metrics


def torrent_files(profile: dict, torrent_hash: str) -> list[dict]:
    rows = client_for(profile).f.multicall(torrent_hash, "", "f.path=", "f.size_bytes=", "f.completed_chunks=", "f.size_chunks=", "f.priority=")
    files = []
    for idx, r in enumerate(rows):
        size = int(r[1] or 0)
        completed_chunks = int(r[2] or 0)
        size_chunks = int(r[3] or 0)
        progress = 100.0 if size <= 0 else round((completed_chunks / size_chunks) * 100, 2) if size_chunks else 0.0
        files.append({
            "index": idx,
            "path": r[0],
            "size": size,
            "size_h": human_size(size),
            "completed_chunks": completed_chunks,
            "size_chunks": size_chunks,
            "progress": min(100.0, max(0.0, progress)),
            "priority": int(r[4] or 0),
        })
    return files


def torrent_file_tree(profile: dict, torrent_hash: str) -> dict:
    # Note: The tree is built from rTorrent file paths without changing the existing flat file API.
    root = {"name": "", "path": "", "type": "directory", "size": 0, "children": {}}
    for item in torrent_files(profile, torrent_hash):
        parts = [part for part in str(item.get("path") or "").split("/") if part]
        node = root
        prefix: list[str] = []
        for part in parts[:-1]:
            prefix.append(part)
            children = node.setdefault("children", {})
            node = children.setdefault(part, {"name": part, "path": "/".join(prefix), "type": "directory", "size": 0, "children": {}})
        name = parts[-1] if parts else str(item.get("path") or f"file-{item.get('index')}")
        child = dict(item)
        child.update({"name": name, "type": "file"})
        node.setdefault("children", {})[name] = child
    def finalize(node: dict) -> dict:
        if node.get("type") == "file":
            return node
        children = [finalize(v) for v in node.get("children", {}).values()]
        children.sort(key=lambda x: (x.get("type") != "directory", str(x.get("name") or "").lower()))
        node["children"] = children
        node["size"] = sum(int(c.get("size") or 0) for c in children)
        node["size_h"] = human_size(node["size"])
        return node
    return finalize(root)



def _torrent_file_remote_path(profile: dict, torrent_hash: str, index: int) -> tuple[dict, str]:
    c = client_for(profile)
    files = torrent_files(profile, torrent_hash)
    selected = next((f for f in files if int(f.get("index", -1)) == int(index)), None)
    if selected is None:
        available = ", ".join(str(f.get("index")) for f in files[:20]) or "none"
        raise ValueError(f"File index {index} not found. Available indexes: {available}")
    base = _remote_clean_path(_torrent_data_path(c, torrent_hash))
    rel = str(selected.get("path") or "").lstrip("/")
    if len(files) == 1 and base and not base.endswith("/"):
        path = base
    else:
        path = _remote_join(base, rel)
    return selected, path


def download_tmp_dir() -> str:
    PYTORRENT_TMP_DIR.mkdir(parents=True, exist_ok=True)
    return str(PYTORRENT_TMP_DIR)


def _remote_readability_error(c: ScgiRtorrentClient, source_path: str) -> str | None:
    script = (
        'p=$1; '
        'command -v base64 >/dev/null 2>&1 || { echo "base64 command not found on rTorrent host"; exit 0; }; '
        '[ -e "$p" ] || { echo "source file does not exist"; exit 0; }; '
        '[ -f "$p" ] || { echo "source path is not a regular file"; exit 0; }; '
        '[ -r "$p" ] || { echo "source file is not readable by rTorrent"; exit 0; }; '
        'echo OK'
    )
    output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-download-check", source_path) or "").strip()
    return None if output == "OK" else (output or "source file cannot be read by rTorrent")


def remote_file_readability_error(profile: dict, source_path: str) -> str | None:
    return _remote_readability_error(client_for(profile), source_path)


def iter_remote_file_chunks(profile: dict, source_path: str, size: int | None = None, chunk_size: int | None = None):
    c = client_for(profile)
    clean = _remote_clean_path(source_path)
    err = _remote_readability_error(c, clean)
    if err:
        raise RuntimeError(err)
    block_size = max(65536, int(chunk_size or REMOTE_READ_CHUNK_BYTES or 1048576))
    offset = 0
    emitted = 0
    script = (
        'p=$1; bs=$2; skip=$3; '
        'command -v base64 >/dev/null 2>&1 || { printf "ERR\tbase64 command not found on rTorrent host"; exit 0; }; '
        '[ -r "$p" ] || { printf "ERR\tsource file is not readable by rTorrent"; exit 0; }; '
        'dd if="$p" bs="$bs" skip="$skip" count=1 2>/dev/null | base64 | tr -d "\n"'
    )
    while size is None or emitted < int(size):
        output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-download-read", clean, str(block_size), str(offset)) or "")
        if output.startswith("ERR\t"):
            raise RuntimeError(output.split("\t", 1)[1] or "remote read failed")
        if not output:
            break
        try:
            chunk = __import__("base64").b64decode(output, validate=False)
        except Exception as exc:
            raise RuntimeError(f"remote read returned invalid base64: {exc}") from exc
        if not chunk:
            break
        yield chunk
        emitted += len(chunk)
        offset += 1
        if size is not None and emitted >= int(size):
            break


def torrent_download_file_info(profile: dict, torrent_hash: str, index: int) -> dict:
    selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index)
    err = remote_file_readability_error(profile, remote_path)
    if err:
        raise RuntimeError(err)
    return {**selected, "remote_path": remote_path, "download_name": LocalPath(str(selected.get("path") or remote_path)).name}


def torrent_download_zip_items(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> list[dict]:
    files = torrent_files(profile, torrent_hash)
    wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files}
    items = []
    for item in files:
        if int(item.get("index", -1)) not in wanted:
            continue
        _, remote_path = _torrent_file_remote_path(profile, torrent_hash, int(item["index"]))
        err = remote_file_readability_error(profile, remote_path)
        if err:
            raise RuntimeError(f"{item.get('path') or item.get('index')}: {err}")
        items.append({**item, "remote_path": remote_path})
    if not items:
        raise ValueError("No files selected")
    return items


def _remote_stage_path(c: ScgiRtorrentClient, source_path: str, suffix: str = "") -> str:
    token = uuid.uuid4().hex
    safe_suffix = ''.join(ch if ch.isalnum() or ch in '.-_' else '_' for ch in str(suffix or ''))[:80]
    target = f"{download_tmp_dir().rstrip('/')}/pytorrent-download-{token}{safe_suffix}"
    script = (
        'src=$1; dst=$2; '
        'if [ ! -f "$src" ]; then echo "ERR\tmissing source"; exit 0; fi; '
        'cp -- "$src" "$dst" 2>/tmp/pytorrent-cp-err-$$ || { rc=$?; err=$(cat /tmp/pytorrent-cp-err-$$ 2>/dev/null); rm -f /tmp/pytorrent-cp-err-$$; printf "ERR\t%s\t%s\n" "$rc" "$err"; exit 0; }; '
        'rm -f /tmp/pytorrent-cp-err-$$; chmod 0644 "$dst" 2>/dev/null || true; printf "OK\t%s\n" "$dst"'
    )
    output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-stage-file", source_path, target) or "").strip()
    parts = (output.splitlines()[0] if output else "").split("\t", 2)
    if len(parts) >= 2 and parts[0] == "OK":
        return parts[1]
    detail = parts[2] if len(parts) > 2 else (parts[1] if len(parts) > 1 else output)
    raise RuntimeError(detail or "Cannot stage file through rTorrent")


def _remote_stage_zip(c: ScgiRtorrentClient, files: list[dict], suffix: str = ".zip") -> str:
    if not files:
        raise ValueError("No files selected")
    token = uuid.uuid4().hex
    tmp_base = download_tmp_dir().rstrip("/")
    list_path = f"{tmp_base}/pytorrent-zip-list-{token}.txt"
    zip_path = f"{tmp_base}/pytorrent-download-{token}{suffix}"
    lines = []
    for item in files:
        src = str(item.get("remote_path") or "")
        arc = str(item.get("path") or LocalPath(src).name).lstrip("/") or LocalPath(src).name
        lines.append(src.replace("\t", " ") + "\t" + arc.replace("\t", " "))
    list_data = "\n".join(lines)
    script = (
        'list=$1; zip=$2; data=$3; umask 022; printf "%s\n" "$data" > "$list"; '
        'rm -f "$zip"; tmpdir=$(mktemp -d /tmp/pytorrent-zip-XXXXXX) || exit 3; '
        'rc=0; while IFS=$(printf "\\t") read -r src arc; do '
        '[ -n "$src" ] || continue; '
        'if [ ! -f "$src" ]; then echo "missing source: $src" >&2; rc=4; break; fi; '
        'case "$arc" in /*|../*|*/../*) echo "unsafe zip path: $arc" >&2; rc=5; break;; esac; '
        'dir=${arc%/*}; if [ "$dir" != "$arc" ]; then mkdir -p "$tmpdir/$dir" || { rc=$?; break; }; fi; cp -- "$src" "$tmpdir/$arc" || { rc=$?; break; }; '
        'done; if [ $rc -eq 0 ]; then (cd "$tmpdir" && zip -qr "$zip" .) || rc=$?; fi; '
        'rm -rf "$tmpdir" "$list"; '
        'if [ $rc -eq 0 ] && [ -f "$zip" ]; then chmod 0644 "$zip" 2>/dev/null || true; printf "OK\t%s\n" "$zip"; else printf "ERR\t%s\n" "$rc"; fi'
    )
    output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-stage-zip", list_path, zip_path, list_data) or "").strip()
    parts = (output.splitlines()[0] if output else "").split("\t", 1)
    if len(parts) == 2 and parts[0] == "OK":
        return parts[1]
    raise RuntimeError(output or "Cannot create ZIP through rTorrent")


def _remote_remove_staged(profile: dict, path: str) -> None:
    clean = str(path or "")
    tmp_prefix = download_tmp_dir().rstrip("/") + "/pytorrent-download-"
    if not clean.startswith(tmp_prefix):
        return
    try:
        _rt_execute(client_for(profile), "execute.throw", "rm", "-f", clean)
    except Exception:
        pass


def torrent_staged_file_path(profile: dict, torrent_hash: str, index: int) -> dict:
    c = client_for(profile)
    selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index)
    suffix = LocalPath(str(selected.get("path") or "file")).suffix
    staged = _remote_stage_path(c, remote_path, suffix)
    return {**selected, "remote_path": remote_path, "staged_path": staged, "download_name": LocalPath(str(selected.get("path") or staged)).name}


def torrent_staged_zip_path(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> dict:
    c = client_for(profile)
    files = torrent_files(profile, torrent_hash)
    wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files}
    items = []
    for item in files:
        if int(item.get("index", -1)) not in wanted:
            continue
        _, remote_path = _torrent_file_remote_path(profile, torrent_hash, int(item["index"]))
        items.append({**item, "remote_path": remote_path})
    staged = _remote_stage_zip(c, items)
    return {"staged_path": staged, "count": len(items)}


def _torrent_raw_from_method(c: ScgiRtorrentClient, torrent_hash: str) -> bytes | None:
    for method in ("d.get_metafile", "d.metafile"):
        try:
            value = c.call(method, torrent_hash)
        except Exception:
            continue
        if hasattr(value, "data"):
            data = value.data
        elif isinstance(value, bytes):
            data = value
        elif isinstance(value, str):
            data = value.encode("latin-1", "ignore")
        else:
            data = None
        if data:
            return bytes(data)
    return None


def _torrent_source_file(c: ScgiRtorrentClient, torrent_hash: str) -> str:
    for method in ("d.tied_to_file", "d.get_tied_to_file", "d.loaded_file", "d.get_loaded_file", "d.session_file", "d.get_session_file"):
        try:
            value = str(c.call(method, torrent_hash) or "").strip()
        except Exception:
            continue
        if value:
            return value
    return ""


def export_torrent_file(profile: dict, torrent_hash: str) -> dict:
    c = client_for(profile)
    name = str(c.call("d.name", torrent_hash) or torrent_hash).strip() or torrent_hash
    filename = f"{name}.torrent" if not name.lower().endswith(".torrent") else name
    raw = _torrent_raw_from_method(c, torrent_hash)
    if raw:
        target = LocalPath(download_tmp_dir()) / f"pytorrent-download-{uuid.uuid4().hex}.torrent"
        target.write_bytes(raw)
        return {"path": str(target), "download_name": filename, "local": True}
    source = _torrent_source_file(c, torrent_hash)
    if not source:
        raise RuntimeError("Cannot find torrent source file in rTorrent")
    staged = _remote_stage_path(c, source, ".torrent")
    return {"path": staged, "download_name": filename, "local": False}

def set_folder_priority(profile: dict, torrent_hash: str, folder_path: str, priority: int) -> dict:
    # Note: Folder priority applies the same rTorrent file priority to every descendant path.
    folder = str(folder_path or "").strip().strip("/")
    updates = []
    for item in torrent_files(profile, torrent_hash):
        path = str(item.get("path") or "").strip("/")
        if not folder or path == folder or path.startswith(folder + "/"):
            updates.append({"index": item["index"], "priority": int(priority)})
    if not updates:
        return {"updated": [], "errors": [{"folder": folder_path, "error": "No files matched folder"}]}
    return set_file_priorities(profile, torrent_hash, updates)


def torrent_local_file_path(profile: dict, torrent_hash: str, index: int) -> str:
    c = client_for(profile)
    files = torrent_files(profile, torrent_hash)
    selected = next((f for f in files if int(f.get("index", -1)) == int(index)), None)
    if not selected:
        raise ValueError("File index not found")
    base = _remote_clean_path(_torrent_data_path(c, torrent_hash))
    rel = str(selected.get("path") or "").lstrip("/")
    if len(files) == 1 and base and not base.endswith("/"):
        path = base
    else:
        path = _remote_join(base, rel)
    # Note: HTTP file serving is enabled only for local profiles to avoid pretending remote files exist locally.
    if int(profile.get("is_remote") or 0):
        raise ValueError("HTTP file download is available only for local rTorrent profiles")
    local = LocalPath(path).resolve()
    if not local.exists() or not local.is_file():
        raise FileNotFoundError(f"Local file is not available: {local}")
    return str(local)


def torrent_local_file_paths(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> list[dict]:
    files = torrent_files(profile, torrent_hash)
    wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files}
    out = []
    for item in files:
        if int(item.get("index", -1)) not in wanted:
            continue
        out.append({**item, "local_path": torrent_local_file_path(profile, torrent_hash, int(item["index"]))})
    return out

def torrent_peers(profile: dict, torrent_hash: str) -> list[dict]:
    fields = [
        "p.address=", "p.client_version=", "p.completed_percent=", "p.down_rate=",
        "p.up_rate=", "p.port=", "p.is_encrypted=", "p.is_incoming=",
        "p.is_snubbed=", "p.is_banned=",
    ]
    try:
        rows = client_for(profile).p.multicall(torrent_hash, "", *fields)
    except Exception:
        fields = ["p.address=", "p.client_version=", "p.completed_percent=", "p.down_rate=", "p.up_rate=", "p.port=", "p.is_encrypted="]
        rows = client_for(profile).p.multicall(torrent_hash, "", *fields)
    peers = []
    for idx, r in enumerate(rows):
        peers.append({
            "index": idx,
            "ip": r[0],
            "client": r[1],
            "completed": int(r[2] or 0),
            "down_rate": int(r[3] or 0),
            "down_rate_h": human_rate(r[3] or 0),
            "up_rate": int(r[4] or 0),
            "up_rate_h": human_rate(r[4] or 0),
            "port": int(r[5] or 0),
            "encrypted": bool(r[6]) if len(r) > 6 else False,
            "incoming": bool(r[7]) if len(r) > 7 else False,
            "snubbed": bool(r[8]) if len(r) > 8 else False,
            "banned": bool(r[9]) if len(r) > 9 else False,
        })
    return peers




def _call_first(c: ScgiRtorrentClient, candidates: list[tuple[str, tuple]]) -> dict:
    errors = []
    for method, args in candidates:
        try:
            result = c.call(method, *args)
            return {"ok": True, "method": method, "result": result}
        except Exception as exc:
            errors.append(f"{method}: {exc}")
    raise RuntimeError("; ".join(errors))



def _tracker_domain(url: str) -> str:
    raw = str(url or '').strip()
    if not raw:
        return ''
    parsed = urlparse(raw if '://' in raw else f'http://{raw}')
    host = (parsed.hostname or '').lower().strip('.')
    if host.startswith('www.'):
        host = host[4:]
    return host


def tracker_summary(profile: dict, torrent_hashes: list[str] | None = None, limit: int = 1000) -> dict:
    """Return tracker domains grouped by torrent for the sidebar filter."""
    # Note: Tracker summary is read-only and isolated from the normal torrent snapshot, so slow tracker RPC calls cannot break the main list.
    hashes = [str(h or '').strip() for h in (torrent_hashes or []) if str(h or '').strip()]
    if not hashes:
        hashes = [t.get('hash') for t in list_torrents(profile) if t.get('hash')]
    hashes = hashes[:max(1, int(limit or 1000))]
    by_hash: dict[str, list[dict]] = {}
    counts: dict[str, dict] = {}
    errors = []
    for h in hashes:
        try:
            items = []
            seen = set()
            for tr in torrent_trackers(profile, h):
                url = str(tr.get('url') or '')
                domain = _tracker_domain(url)
                if not domain or domain in seen:
                    continue
                seen.add(domain)
                item = {'domain': domain, 'url': url}
                items.append(item)
                row = counts.setdefault(domain, {'domain': domain, 'url': url, 'count': 0})
                row['count'] += 1
            by_hash[h] = items
        except Exception as exc:
            errors.append({'hash': h, 'error': str(exc)})
            by_hash[h] = []
    trackers = sorted(counts.values(), key=lambda x: (-int(x.get('count') or 0), str(x.get('domain') or '')))
    return {'hashes': by_hash, 'trackers': trackers, 'errors': errors, 'scanned': len(hashes)}

def _safe_tracker_call(c: ScgiRtorrentClient, method: str, target: str, default=None):
    try:
        return c.call(method, target)
    except Exception:
        return default


def _tracker_target(torrent_hash: str, index: int) -> str:
    return f"{torrent_hash}:t{int(index)}"

def _tracker_int(value, default=None):
    try:
        if value is None or value == "":
            return default
        return int(value)
    except Exception:
        return default


def _tracker_rows(c: ScgiRtorrentClient, torrent_hash: str) -> list[list]:
    fields = ("t.url=", "t.is_enabled=", "t.scrape_complete=", "t.scrape_incomplete=", "t.scrape_downloaded=")
    errors: list[str] = []
    for args in ((torrent_hash, "", *fields), ("", torrent_hash, *fields)):
        try:
            rows = c.call("t.multicall", *args)
            return [list(r) for r in (rows or [])]
        except Exception as exc:
            errors.append(f"t.multicall{args[:2]}: {exc}")
    # Note: Fallback keeps the sidebar tracker filter usable on rTorrent builds without t.multicall scrape fields.
    total = _tracker_int(_safe_tracker_call(c, "d.tracker_size", torrent_hash, 0), 0) or 0
    rows: list[list] = []
    for index in range(max(0, total)):
        target = _tracker_target(torrent_hash, index)
        url = _safe_tracker_call(c, "t.url", target, "")
        if not url:
            for args in ((torrent_hash, index), ("", torrent_hash, index)):
                try:
                    url = c.call("t.url", *args)
                    break
                except Exception:
                    continue
        if url:
            enabled = _safe_tracker_call(c, "t.is_enabled", target, 1)
            rows.append([url, enabled, None, None, None])
    if rows:
        return rows
    raise RuntimeError("Cannot read trackers: " + "; ".join(errors))


def torrent_trackers(profile: dict, torrent_hash: str) -> list[dict]:
    c = client_for(profile)
    rows = _tracker_rows(c, torrent_hash)
    trackers = []
    for idx, r in enumerate(rows):
        target = _tracker_target(torrent_hash, idx)
        last_announce = _safe_tracker_call(c, "t.activity_time_last", target, 0)
        scrape_time = _safe_tracker_call(c, "t.scrape_time_last", target, 0)
        if not last_announce:
            last_announce = scrape_time
        next_announce = _safe_tracker_call(c, "t.activity_time_next", target, 0)
        raw_seeds = _tracker_int(r[2], None)
        raw_peers = _tracker_int(r[3], None)
        raw_downloaded = _tracker_int(r[4], None)
        has_scrape = bool(_tracker_int(scrape_time, 0)) or raw_seeds not in (None, 0) or raw_peers not in (None, 0) or raw_downloaded not in (None, 0)
        trackers.append({
            "index": idx,
            "url": str(r[0] or ""),
            "enabled": bool(r[1]),
            "seeds": raw_seeds if has_scrape else None,
            "peers": raw_peers if has_scrape else None,
            "downloaded": raw_downloaded if has_scrape else None,
            "has_scrape": has_scrape,
            "last_announce": int(last_announce or 0),
            "next_announce": int(next_announce or 0),
        })
    return trackers

def tracker_action(profile: dict, torrent_hash: str, action_name: str, payload: dict | None = None) -> dict:
    payload = payload or {}
    c = client_for(profile)
    if action_name == "reannounce":
        return _call_first(c, [
            ("d.tracker_announce", (torrent_hash,)),
            ("d.tracker_announce", ("", torrent_hash)),
            ("d.tracker_announce.force", (torrent_hash,)),
        ])
    if action_name == "add":
        url = str(payload.get("url") or "").strip()
        if not url:
            raise ValueError("Missing tracker URL")
        return _call_first(c, [
            ("d.tracker.insert", (torrent_hash, "", url)),
            ("d.tracker.insert", (torrent_hash, 0, url)),
            ("d.tracker.insert", ("", torrent_hash, "", url)),
        ])
    if action_name in {"delete", "remove"}:
        # Note: Deleting trackers is guarded to keep at least one tracker attached to the torrent.
        index = int(payload.get("index", -1))
        if index < 0:
            raise ValueError("Invalid tracker index")
        total = _tracker_int(_safe_tracker_call(c, "d.tracker_size", torrent_hash, 0), 0) or len(torrent_trackers(profile, torrent_hash))
        if total <= 1:
            raise ValueError("Cannot delete the last tracker")
        if index >= total:
            raise ValueError("Invalid tracker index")
        return _call_first(c, [
            ("d.tracker.remove", (torrent_hash, index)),
            ("d.tracker.remove", (torrent_hash, "", index)),
            ("d.tracker.erase", (torrent_hash, index)),
            ("d.tracker.erase", (torrent_hash, "", index)),
            ("d.tracker.delete", (torrent_hash, index)),
            ("d.tracker.delete", (torrent_hash, "", index)),
        ])
    raise ValueError(f"Unknown tracker action: {action_name}")


RTORRENT_CONFIG_FIELDS = [
    {"group": "Directories", "key": "directory.default", "label": "Default download directory", "type": "text"},
    {"group": "Directories", "key": "session.path", "label": "Session path", "type": "text"},
    {"group": "Directories", "key": "system.cwd", "label": "Working directory", "type": "text", "readonly": True},
    {"group": "Network", "key": "network.port_range", "label": "Incoming port range", "type": "text", "placeholder": "49164-49164"},
    {"group": "Network", "key": "network.port_random", "label": "Random incoming port", "type": "bool"},
    {"group": "Network", "key": "network.bind_address", "label": "Bind address", "type": "text", "placeholder": "0.0.0.0"},
    {"group": "Network", "key": "network.local_address", "label": "Local address", "type": "text"},
    {"group": "Network", "key": "network.max_open_files", "label": "Max open files", "type": "number"},
    {"group": "Network", "key": "network.max_open_sockets", "label": "Max open sockets", "type": "number"},
    {"group": "Network", "key": "network.http.max_open", "label": "Max HTTP connections", "type": "number"},
    {"group": "Network", "key": "network.http.ssl_verify_peer", "label": "Verify SSL peers", "type": "bool"},
    {"group": "Peers", "key": "throttle.min_peers.normal", "label": "Min peers downloading", "type": "number"},
    {"group": "Peers", "key": "throttle.max_peers.normal", "label": "Max peers downloading", "type": "number"},
    {"group": "Peers", "key": "throttle.min_peers.seed", "label": "Min peers seeding", "type": "number"},
    {"group": "Peers", "key": "throttle.max_peers.seed", "label": "Max peers seeding", "type": "number"},
    {"group": "Peers", "key": "trackers.numwant", "label": "Tracker numwant", "type": "number"},
    {"group": "Throttle", "key": "throttle.global_down.max_rate", "label": "Global download limit B/s", "type": "number"},
    {"group": "Throttle", "key": "throttle.global_up.max_rate", "label": "Global upload limit B/s", "type": "number"},
    {"group": "Throttle", "key": "throttle.max_downloads.global", "label": "Max active downloads", "type": "number"},
    {"group": "Throttle", "key": "throttle.max_uploads.global", "label": "Max active uploads", "type": "number"},
    {"group": "Throttle", "key": "throttle.max_downloads.div", "label": "Max downloads per throttle", "type": "number"},
    {"group": "Throttle", "key": "throttle.max_uploads.div", "label": "Max uploads per throttle", "type": "number"},
    {"group": "DHT / PEX", "key": "dht.mode", "label": "DHT mode", "type": "text", "placeholder": "disable/off/auto/on"},
    {"group": "DHT / PEX", "key": "dht.port", "label": "DHT port", "type": "number"},
    {"group": "DHT / PEX", "key": "protocol.pex", "label": "Peer exchange", "type": "bool"},
    {"group": "Protocol", "key": "protocol.encryption.set", "label": "Encryption flags", "type": "text", "placeholder": "allow_incoming,try_outgoing,enable_retry"},
    {"group": "Protocol", "key": "protocol.connection.leech", "label": "Leech connection type", "type": "text", "placeholder": "leech"},
    {"group": "Protocol", "key": "protocol.connection.seed", "label": "Seed connection type", "type": "text", "placeholder": "seed"},
    {"group": "Files", "key": "pieces.hash.on_completion", "label": "Hash check on completion", "type": "bool"},
    {"group": "Files", "key": "pieces.preload.type", "label": "Pieces preload type", "type": "number"},
    {"group": "Files", "key": "pieces.preload.min_size", "label": "Pieces preload min size", "type": "number"},
    {"group": "Files", "key": "pieces.preload.min_rate", "label": "Pieces preload min rate", "type": "number"},
    {"group": "Files", "key": "system.file.allocate", "label": "File allocation", "type": "number"},
    {"group": "Files", "key": "system.file.max_size", "label": "Max file size", "type": "number"},
    {"group": "System", "key": "system.umask", "label": "File umask", "type": "text", "placeholder": "0002"},
    {"group": "System", "key": "system.hostname", "label": "Hostname", "type": "text", "readonly": True},
    {"group": "System", "key": "system.client_version", "label": "Client version", "type": "text", "readonly": True},
    {"group": "System", "key": "system.library_version", "label": "Library version", "type": "text", "readonly": True},
]


def _normalize_config_value(meta: dict, value):
    if meta.get("type") == "bool":
        return "1" if str(value).lower() in {"1", "true", "yes", "on"} or value is True else "0"
    if meta.get("type") == "number":
        return str(int(value or 0))
    return str(value or "").strip()


def saved_config_overrides(profile_id: int, user_id: int | None = None) -> dict[str, dict]:
    user_id = user_id or default_user_id()
    with connect() as conn:
        rows = conn.execute(
            "SELECT key,value,baseline_value,apply_on_start,updated_at FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=?",
            (user_id, int(profile_id)),
        ).fetchall()
    return {r["key"]: r for r in rows}


def get_config(profile: dict) -> dict:
    c = client_for(profile)
    saved = saved_config_overrides(int(profile["id"]))
    fields = []
    for meta in RTORRENT_CONFIG_FIELDS:
        item = dict(meta)
        saved_item = saved.get(meta["key"])
        try:
            item["value"] = _normalize_config_value(meta, c.call(meta["key"]))
            item["current_value"] = item["value"]
            item["ok"] = True
        except Exception as exc:
            item["value"] = ""
            item["current_value"] = ""
            item["ok"] = False
            item["error"] = str(exc)
        if saved_item:
            saved_value = _normalize_config_value(meta, saved_item.get("value"))
            baseline_raw = saved_item.get("baseline_value")
            if baseline_raw not in (None, ""):
                baseline_value = _normalize_config_value(meta, baseline_raw)
            else:
                baseline_value = _normalize_config_value(meta, item.get("current_value"))
            item["saved"] = True
            item["saved_value"] = saved_value
            item["baseline_value"] = baseline_value
            item["apply_on_start"] = bool(saved_item.get("apply_on_start"))
            item["changed"] = saved_value != baseline_value
        fields.append(item)
    return {"fields": fields, "apply_on_start": any(bool(v.get("apply_on_start")) for v in saved.values())}



def default_download_path(profile: dict) -> str:
    """Return rTorrent default download directory for the active profile."""
    c = client_for(profile)
    errors = []
    for method in ("directory.default", "system.cwd"):
        try:
            value = str(c.call(method) or "").strip()
            if value:
                return value
        except Exception as exc:
            errors.append(f"{method}: {exc}")
    raise RuntimeError("Cannot read rTorrent default download directory: " + "; ".join(errors))

def generate_config_text(values: dict) -> str:
    known = {f["key"]: f for f in RTORRENT_CONFIG_FIELDS}
    lines = []
    for key, value in (values or {}).items():
        meta = known.get(key)
        if not meta or meta.get("readonly"):
            continue
        normalized = _normalize_config_value(meta, value)
        if meta.get("type") == "text" and any(ch.isspace() for ch in normalized):
            normalized = '"' + normalized.replace('\\', '\\\\').replace('"', '\\"') + '"'
        lines.append(f"{key}.set = {normalized}")
    return "\n".join(lines) + ("\n" if lines else "")


def _read_rtorrent_config_value(client, key: str, meta: dict) -> str:
    return _normalize_config_value(meta, client.call(key))


def store_config_overrides(profile: dict, values: dict, apply_on_start: bool, baseline_values: dict | None = None, clear_keys: list[str] | None = None) -> list[str]:
    known = {f["key"]: f for f in RTORRENT_CONFIG_FIELDS}
    user_id = default_user_id()
    now = utcnow()
    profile_id = int(profile["id"])
    baseline_values = baseline_values or {}
    clear_set = set(clear_keys or [])
    stored = []
    with connect() as conn:
        for key in clear_set:
            if key in known:
                conn.execute(
                    "DELETE FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=? AND key=?",
                    (user_id, profile_id, key),
                )
        for key, value in (values or {}).items():
            if key in clear_set:
                continue
            meta = known.get(key)
            if not meta or meta.get("readonly"):
                continue
            normalized = _normalize_config_value(meta, value)
            existing = conn.execute(
                "SELECT baseline_value FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=? AND key=?",
                (user_id, profile_id, key),
            ).fetchone()
            existing_baseline = existing.get("baseline_value") if existing else None

            # Keep the first reference value forever until the override is cleared.
            # Without this, a second save could treat already-overridden rTorrent
            # values as the new baseline and the UI would stop marking them as changed.
            if existing_baseline not in (None, ""):
                baseline = _normalize_config_value(meta, existing_baseline)
            else:
                baseline = _normalize_config_value(meta, baseline_values.get(key)) if key in baseline_values else None

            if baseline not in (None, "") and normalized == baseline:
                conn.execute(
                    "DELETE FROM rtorrent_config_overrides WHERE user_id=? AND profile_id=? AND key=?",
                    (user_id, profile_id, key),
                )
                continue
            conn.execute(
                "INSERT OR REPLACE INTO rtorrent_config_overrides(user_id,profile_id,key,value,baseline_value,apply_on_start,updated_at) VALUES(?,?,?,?,?,?,?)",
                (user_id, profile_id, key, normalized, baseline, 1 if apply_on_start else 0, now),
            )
            stored.append(key)
        conn.execute(
            "UPDATE rtorrent_config_overrides SET apply_on_start=?, updated_at=? WHERE user_id=? AND profile_id=?",
            (1 if apply_on_start else 0, now, user_id, profile_id),
        )
    return stored


def set_config(profile: dict, values: dict, apply_now: bool = True, apply_on_start: bool = False, clear_keys: list[str] | None = None) -> dict:
    updated, errors = [], []
    known = {f["key"]: f for f in RTORRENT_CONFIG_FIELDS}
    c = client_for(profile)
    baseline_values = {}
    for key, raw_value in (values or {}).items():
        meta = known.get(key)
        if not meta or meta.get("readonly"):
            continue
        try:
            baseline_values[key] = _read_rtorrent_config_value(c, key, meta)
        except Exception:
            pass
    stored = store_config_overrides(profile, values, apply_on_start, baseline_values, clear_keys)
    if not apply_now:
        return {"ok": True, "updated": [], "stored": stored, "errors": []}
    for key, raw_value in (values or {}).items():
        if key not in known:
            continue
        meta = known[key]
        if meta.get("readonly"):
            continue
        value = _normalize_config_value(meta, raw_value)
        rpc_value = int(value) if meta.get("type") in {"bool", "number"} else value
        try:
            try:
                c.call(key + ".set", "", rpc_value)
            except Exception:
                c.call(key + ".set", rpc_value)
            updated.append(key)
        except Exception as exc:
            errors.append({"key": key, "error": str(exc)})
    return {"ok": not errors, "updated": updated, "stored": stored, "errors": errors}


def apply_startup_overrides(profile: dict) -> dict:
    rows = saved_config_overrides(int(profile["id"]))
    values = {k: v.get("value") for k, v in rows.items() if v.get("apply_on_start")}
    if not values:
        return {"ok": True, "updated": [], "errors": [], "skipped": True}
    return set_config(profile, values, apply_now=True, apply_on_start=True)


def _int_rpc(c: ScgiRtorrentClient, method: str, h: str, default: int = 0) -> int:
    try:
        return int(c.call(method, h) or 0)
    except Exception:
        return default


def _str_rpc(c: ScgiRtorrentClient, method: str, h: str, default: str = '') -> str:
    try:
        return str(c.call(method, h) or '')
    except Exception:
        return default


def _download_runtime_state(c: ScgiRtorrentClient, h: str) -> dict:
    """Read rTorrent state using the native pause model: stopped, paused or active."""
    state = _int_rpc(c, 'd.state', h)
    active = _int_rpc(c, 'd.is_active', h)
    opened = _int_rpc(c, 'd.is_open', h)
    # Note: In rTorrent, pause does not change d.state. Paused means state=1, open=1, active=0.
    return {
        'state': state,
        'open': opened,
        'active': active,
        'paused': bool(state and opened and not active),
        'stopped': not bool(state),
        'message': _str_rpc(c, 'd.message', h),
    }


def pause_hash(c: ScgiRtorrentClient, torrent_hash: str) -> dict:
    """Pause an active rTorrent item without stopping or closing it."""
    h = str(torrent_hash or '')
    if not h:
        return {'hash': h, 'ok': False, 'error': 'missing hash'}
    before = _download_runtime_state(c, h)
    result = {'hash': h, 'before': before, 'commands': []}
    try:
        if before.get('stopped'):
            # Note: rTorrent does not turn a stopped item into a paused one with d.pause alone.
            # First move it out of STOP, then pause it, which matches the expected START -> PAUSE flow.
            try:
                c.call('d.open', h)
                result['commands'].append('d.open')
            except Exception as exc:
                result.setdefault('ignored_errors', []).append(f'd.open: {exc}')
            c.call('d.start', h)
            result['commands'].append('d.start')
        # Note: Smart Queue frees a slot with d.pause, not d.stop, so later d.resume behaves like ruTorrent.
        c.call('d.pause', h)
        result['commands'].append('d.pause')
        result['after'] = _download_runtime_state(c, h)
        result['ok'] = True
    except Exception as exc:
        result.update({'ok': False, 'error': str(exc), 'after': _download_runtime_state(c, h)})
    return result


def stop_hash(c: ScgiRtorrentClient, torrent_hash: str) -> dict:
    """Stop an active rTorrent item without using pause semantics."""
    h = str(torrent_hash or '')
    if not h:
        return {'hash': h, 'ok': False, 'error': 'missing hash'}
    before = _download_runtime_state(c, h)
    result = {'hash': h, 'before': before, 'commands': []}
    if before.get('stopped'):
        result.update({'ok': True, 'skipped': 'already_stopped', 'after': before})
        return result
    try:
        # Note: Smart Queue now enforces the queue with d.stop only; user-paused torrents stay untouched.
        c.call('d.stop', h)
        result['commands'].append('d.stop')
        result['after'] = _download_runtime_state(c, h)
        result['ok'] = True
    except Exception as exc:
        result.update({'ok': False, 'error': str(exc), 'after': _download_runtime_state(c, h)})
    return result


def resume_paused_hash(c: ScgiRtorrentClient, torrent_hash: str) -> dict:
    """Resume only a paused rTorrent item; never convert it through stop/start."""
    h = str(torrent_hash or '')
    if not h:
        return {'hash': h, 'ok': False, 'error': 'missing hash'}
    before = _download_runtime_state(c, h)
    result: dict = {'hash': h, 'before': before, 'commands': []}
    if before.get('stopped'):
        result.update({'ok': False, 'skipped': 'stopped_not_paused', 'after': before})
        return result
    if before.get('active'):
        result.update({'ok': True, 'skipped': 'already_active', 'after': before})
        return result
    try:
        # Note: ruTorrent unpauses with the equivalent of d.resume. Do not add d.start/d.open,
        # because those commands belong to Stopped/Open state, not a clean Paused state.
        c.call('d.resume', h)
        result['commands'].append('d.resume')
        result['after'] = _download_runtime_state(c, h)
        result['ok'] = True
    except Exception as exc:
        result.update({'ok': False, 'error': str(exc), 'after': _download_runtime_state(c, h)})
    return result


def start_or_resume_hash(c: ScgiRtorrentClient, torrent_hash: str, prefer_start: bool = False) -> dict:
    """Start stopped torrents or resume real paused torrents.

    Smart Queue passes prefer_start=True for candidates that were selected as stopped.
    This avoids treating rTorrent's intermediate open/inactive state after a check as
    a user pause and sending only d.resume, which can leave items pending forever.
    """
    h = str(torrent_hash or '')
    if not h:
        return {'hash': h, 'ok': False, 'error': 'missing hash'}
    before = _download_runtime_state(c, h)
    result: dict = {'hash': h, 'before': before, 'commands': []}

    if before.get('active'):
        result.update({'ok': True, 'skipped': 'already_active', 'after': before})
        return result

    if before.get('paused') and not prefer_start:
        # Note: Manual Start keeps the clean pause-to-resume path. Do not classify every
        # state=1/active=0 item as paused; after auto-check this can be only a transient
        # open/inactive rTorrent state and needs d.open + d.start.
        resumed = resume_paused_hash(c, h)
        resumed['mode'] = 'resume_paused'
        return resumed

    try:
        c.call('d.open', h)
        result['commands'].append('d.open')
    except Exception as exc:
        result.setdefault('ignored_errors', []).append(f'd.open: {exc}')
    try:
        c.call('d.start', h)
        result['commands'].append('d.start')
    except Exception as exc:
        result.setdefault('ignored_errors', []).append(f'd.start: {exc}')
        try:
            c.call('d.try_start', h)
            result['commands'].append('d.try_start')
        except Exception as exc2:
            result.setdefault('ignored_errors', []).append(f'd.try_start: {exc2}')
            result['ok'] = False
    result['after'] = _download_runtime_state(c, h)
    result['ok'] = result.get('ok', True)
    return result

def action(profile: dict, torrent_hashes: list[str], name: str, payload: dict | None = None) -> dict:
    payload = payload or {}
    c = client_for(profile)
    methods = {
        "stop": "d.stop",
        "recheck": "d.check_hash",
        "reannounce": "d.tracker_announce",
        "remove": "d.erase",
    }
    if name == "set_label":
        label = str(payload.get("label") or "").strip()
        for h in torrent_hashes:
            c.call("d.custom1.set", h, label)
        return {"ok": True, "count": len(torrent_hashes), "label": label}
    if name == "set_ratio_group":
        group = str(payload.get("ratio_group") or "").strip()
        for h in torrent_hashes:
            c.call("d.custom.set", h, "py_ratio_group", group)
        return {"ok": True, "count": len(torrent_hashes), "ratio_group": group}
    if name == "move":
        path = _remote_clean_path(payload.get("path") or "")
        move_data = bool(payload.get("move_data"))
        recheck = bool(payload.get("recheck", move_data))
        keep_seeding = bool(payload.get("keep_seeding"))
        # Note: Automations can force seeding after a physical move even if the torrent was not active before.
        if not path:
            raise ValueError("Missing path")
        results = []
        if move_data:
            _rt_execute_allow_timeout(c, "execute.throw", "mkdir", "-p", path)
        for h in torrent_hashes:
            item = {"hash": h, "path": path, "move_data": move_data, "keep_seeding": keep_seeding}
            try:
                was_state = int(c.call("d.state", h) or 0)
            except Exception:
                was_state = 0
            try:
                was_active = int(c.call("d.is_active", h) or 0)
            except Exception:
                was_active = was_state
            if move_data:
                src = _remote_clean_path(_torrent_data_path(c, h))
                if not src:
                    raise ValueError(f"Cannot determine source path for {h}")
                dst = _remote_join(path, posixpath.basename(src.rstrip("/")))
                if src != dst:
                    try:
                        c.call("d.stop", h)
                    except Exception:
                        pass
                    try:
                        c.call("d.close", h)
                    except Exception:
                        pass
                    _run_remote_move(c, src, dst)
                    item["moved_from"] = src
                    item["moved_to"] = dst
                else:
                    item["skipped"] = "source and destination are the same"
                c.call("d.directory.set", h, path)
                if recheck:
                    try:
                        c.call("d.check_hash", h)
                    except Exception as exc:
                        item["recheck_error"] = str(exc)
                if keep_seeding or was_state or was_active:
                    try:
                        c.call("d.start", h)
                        item["started_after_move"] = True
                    except Exception as exc:
                        item["start_after_move_error"] = str(exc)
            else:
                c.call("d.directory.set", h, path)
            results.append(item)
        return {"ok": True, "count": len(torrent_hashes), "move_data": move_data, "keep_seeding": keep_seeding, "results": results}
    if name == "pause":
        # Note: The app pause action is now a pure d.pause so later resume works without stop/start.
        results = [pause_hash(c, h) for h in torrent_hashes]
        return {"ok": True, "count": len(torrent_hashes), "remove_data": False, "results": results}
    if name in {"resume", "unpause"}:
        # Note: Resume/Unpause uses only d.resume for Paused state.
        results = [resume_paused_hash(c, h) for h in torrent_hashes]
        return {"ok": True, "count": len(torrent_hashes), "remove_data": False, "results": results}
    if name == "start":
        # Note: Start separates Stopped from Paused; paused items go through d.resume, stopped items through d.start.
        results = [start_or_resume_hash(c, h) for h in torrent_hashes]
        return {"ok": True, "count": len(torrent_hashes), "remove_data": False, "results": results}

    method = methods.get(name)
    if not method:
        raise ValueError(f"Unknown action: {name}")
    remove_data = bool(payload.get("remove_data")) if name == "remove" else False
    results = []
    for h in torrent_hashes:
        if remove_data:
            results.append(_remove_torrent_data(c, h))
        c.call(method, h)
        if name == "recheck":
            # Note: Recheck is tracked so even very fast checks still receive the after-check start/stop policy.
            _mark_post_check_watch(int(profile.get("id") or 0), h)
    return {"ok": True, "count": len(torrent_hashes), "remove_data": remove_data, "results": results}

def add_magnet(profile: dict, uri: str, start: bool = True, directory: str = "", label: str = "") -> dict:
    c = client_for(profile)
    commands = []
    if directory:
        commands.append(f"d.directory.set={directory}")
    if label:
        commands.append(f"d.custom1.set={label}")
    if start:
        c.load.start_verbose("", uri, *commands)
    else:
        c.load.normal("", uri, *commands)
    return {"ok": True}


def set_limits(profile: dict, down: int | None, up: int | None):
    """Set global speed limits in bytes/s.

    rTorrent XML-RPC setters need an empty target string as the first
    argument. Without it rTorrent returns: target must be a string.
    """
    c = client_for(profile)
    if down is not None:
        c.call("throttle.global_down.max_rate.set", "", int(down))
    if up is not None:
        c.call("throttle.global_up.max_rate.set", "", int(up))
    return {"ok": True, "down": int(down or 0), "up": int(up or 0)}


def add_torrent_raw(profile: dict, data: bytes, start: bool = True, directory: str = "", label: str = "", file_priorities: list[dict] | None = None) -> dict:
    c = client_for(profile)
    commands = []
    if directory:
        commands.append(f"d.directory.set={directory}")
    if label:
        commands.append(f"d.custom1.set={label}")
    # Note: File selection before start loads the torrent stopped, changes priorities, then starts it if requested.
    method = "load.raw" if file_priorities else ("load.raw_start" if start else "load.raw")
    c.call(method, "", Binary(data), *commands)
    info_hash = ""
    if file_priorities:
        try:
            from .torrent_meta import parse_torrent
            info_hash = parse_torrent(data).get("info_hash") or ""
            set_file_priorities(profile, info_hash, file_priorities)
            if start:
                c.call("d.start", info_hash)
        except Exception as exc:
            return {"ok": False, "info_hash": info_hash, "error": str(exc)}
    return {"ok": True, "info_hash": info_hash}
