from __future__ import annotations from .client import * def torrent_files(profile: dict, torrent_hash: str) -> list[dict]: rows = client_for(profile).f.multicall(torrent_hash, "", "f.path=", "f.size_bytes=", "f.completed_chunks=", "f.size_chunks=", "f.priority=") files = [] for idx, r in enumerate(rows): size = int(r[1] or 0) completed_chunks = int(r[2] or 0) size_chunks = int(r[3] or 0) progress = 100.0 if size <= 0 else round((completed_chunks / size_chunks) * 100, 2) if size_chunks else 0.0 files.append({ "index": idx, "path": r[0], "size": size, "size_h": human_size(size), "completed_chunks": completed_chunks, "size_chunks": size_chunks, "progress": min(100.0, max(0.0, progress)), "priority": int(r[4] or 0), }) return files def torrent_file_tree(profile: dict, torrent_hash: str) -> dict: # Note: The tree is built from rTorrent file paths without changing the existing flat file API. root = {"name": "", "path": "", "type": "directory", "size": 0, "children": {}} for item in torrent_files(profile, torrent_hash): parts = [part for part in str(item.get("path") or "").split("/") if part] node = root prefix: list[str] = [] for part in parts[:-1]: prefix.append(part) children = node.setdefault("children", {}) node = children.setdefault(part, {"name": part, "path": "/".join(prefix), "type": "directory", "size": 0, "children": {}}) name = parts[-1] if parts else str(item.get("path") or f"file-{item.get('index')}") child = dict(item) child.update({"name": name, "type": "file"}) node.setdefault("children", {})[name] = child def finalize(node: dict) -> dict: if node.get("type") == "file": return node children = [finalize(v) for v in node.get("children", {}).values()] children.sort(key=lambda x: (x.get("type") != "directory", str(x.get("name") or "").lower())) node["children"] = children node["size"] = sum(int(c.get("size") or 0) for c in children) node["size_h"] = human_size(node["size"]) return node return finalize(root) def _torrent_file_remote_path(profile: dict, torrent_hash: str, index: int) -> tuple[dict, str]: c = client_for(profile) files = torrent_files(profile, torrent_hash) selected = next((f for f in files if int(f.get("index", -1)) == int(index)), None) if selected is None: available = ", ".join(str(f.get("index")) for f in files[:20]) or "none" raise ValueError(f"File index {index} not found. Available indexes: {available}") base = _remote_clean_path(_torrent_data_path(c, torrent_hash)) rel = str(selected.get("path") or "").lstrip("/") if len(files) == 1 and base and not base.endswith("/"): path = base else: path = _remote_join(base, rel) return selected, path def download_tmp_dir() -> str: PYTORRENT_TMP_DIR.mkdir(parents=True, exist_ok=True) return str(PYTORRENT_TMP_DIR) def _remote_readability_error(c: ScgiRtorrentClient, source_path: str) -> str | None: script = ( 'p=$1; ' 'command -v base64 >/dev/null 2>&1 || { echo "base64 command not found on rTorrent host"; exit 0; }; ' '[ -e "$p" ] || { echo "source file does not exist"; exit 0; }; ' '[ -f "$p" ] || { echo "source path is not a regular file"; exit 0; }; ' '[ -r "$p" ] || { echo "source file is not readable by rTorrent"; exit 0; }; ' 'echo OK' ) output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-download-check", source_path) or "").strip() return None if output == "OK" else (output or "source file cannot be read by rTorrent") def remote_file_readability_error(profile: dict, source_path: str) -> str | None: return _remote_readability_error(client_for(profile), source_path) def iter_remote_file_chunks(profile: dict, source_path: str, size: int | None = None, chunk_size: int | None = None): c = client_for(profile) clean = _remote_clean_path(source_path) err = _remote_readability_error(c, clean) if err: raise RuntimeError(err) block_size = max(65536, int(chunk_size or REMOTE_READ_CHUNK_BYTES or 1048576)) offset = 0 emitted = 0 script = ( 'p=$1; bs=$2; skip=$3; ' 'command -v base64 >/dev/null 2>&1 || { printf "ERR\tbase64 command not found on rTorrent host"; exit 0; }; ' '[ -r "$p" ] || { printf "ERR\tsource file is not readable by rTorrent"; exit 0; }; ' 'dd if="$p" bs="$bs" skip="$skip" count=1 2>/dev/null | base64 | tr -d "\n"' ) while size is None or emitted < int(size): output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-download-read", clean, str(block_size), str(offset)) or "") if output.startswith("ERR\t"): raise RuntimeError(output.split("\t", 1)[1] or "remote read failed") if not output: break try: chunk = __import__("base64").b64decode(output, validate=False) except Exception as exc: raise RuntimeError(f"remote read returned invalid base64: {exc}") from exc if not chunk: break yield chunk emitted += len(chunk) offset += 1 if size is not None and emitted >= int(size): break def torrent_download_file_info(profile: dict, torrent_hash: str, index: int) -> dict: selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index) err = remote_file_readability_error(profile, remote_path) if err: raise RuntimeError(err) return {**selected, "remote_path": remote_path, "download_name": LocalPath(str(selected.get("path") or remote_path)).name} def torrent_download_zip_items(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> list[dict]: files = torrent_files(profile, torrent_hash) wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files} items = [] for item in files: if int(item.get("index", -1)) not in wanted: continue _, remote_path = _torrent_file_remote_path(profile, torrent_hash, int(item["index"])) err = remote_file_readability_error(profile, remote_path) if err: raise RuntimeError(f"{item.get('path') or item.get('index')}: {err}") items.append({**item, "remote_path": remote_path}) if not items: raise ValueError("No files selected") return items def _remote_stage_path(c: ScgiRtorrentClient, source_path: str, suffix: str = "") -> str: token = uuid.uuid4().hex safe_suffix = ''.join(ch if ch.isalnum() or ch in '.-_' else '_' for ch in str(suffix or ''))[:80] target = f"{download_tmp_dir().rstrip('/')}/pytorrent-download-{token}{safe_suffix}" script = ( 'src=$1; dst=$2; ' 'if [ ! -f "$src" ]; then echo "ERR\tmissing source"; exit 0; fi; ' 'cp -- "$src" "$dst" 2>/tmp/pytorrent-cp-err-$$ || { rc=$?; err=$(cat /tmp/pytorrent-cp-err-$$ 2>/dev/null); rm -f /tmp/pytorrent-cp-err-$$; printf "ERR\t%s\t%s\n" "$rc" "$err"; exit 0; }; ' 'rm -f /tmp/pytorrent-cp-err-$$; chmod 0644 "$dst" 2>/dev/null || true; printf "OK\t%s\n" "$dst"' ) output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-stage-file", source_path, target) or "").strip() parts = (output.splitlines()[0] if output else "").split("\t", 2) if len(parts) >= 2 and parts[0] == "OK": return parts[1] detail = parts[2] if len(parts) > 2 else (parts[1] if len(parts) > 1 else output) raise RuntimeError(detail or "Cannot stage file through rTorrent") def _remote_stage_zip(c: ScgiRtorrentClient, files: list[dict], suffix: str = ".zip") -> str: if not files: raise ValueError("No files selected") token = uuid.uuid4().hex tmp_base = download_tmp_dir().rstrip("/") list_path = f"{tmp_base}/pytorrent-zip-list-{token}.txt" zip_path = f"{tmp_base}/pytorrent-download-{token}{suffix}" lines = [] for item in files: src = str(item.get("remote_path") or "") arc = str(item.get("path") or LocalPath(src).name).lstrip("/") or LocalPath(src).name lines.append(src.replace("\t", " ") + "\t" + arc.replace("\t", " ")) list_data = "\n".join(lines) script = ( 'list=$1; zip=$2; data=$3; umask 022; printf "%s\n" "$data" > "$list"; ' 'rm -f "$zip"; tmpdir=$(mktemp -d /tmp/pytorrent-zip-XXXXXX) || exit 3; ' 'rc=0; while IFS=$(printf "\\t") read -r src arc; do ' '[ -n "$src" ] || continue; ' 'if [ ! -f "$src" ]; then echo "missing source: $src" >&2; rc=4; break; fi; ' 'case "$arc" in /*|../*|*/../*) echo "unsafe zip path: $arc" >&2; rc=5; break;; esac; ' 'dir=${arc%/*}; if [ "$dir" != "$arc" ]; then mkdir -p "$tmpdir/$dir" || { rc=$?; break; }; fi; cp -- "$src" "$tmpdir/$arc" || { rc=$?; break; }; ' 'done; if [ $rc -eq 0 ]; then (cd "$tmpdir" && zip -qr "$zip" .) || rc=$?; fi; ' 'rm -rf "$tmpdir" "$list"; ' 'if [ $rc -eq 0 ] && [ -f "$zip" ]; then chmod 0644 "$zip" 2>/dev/null || true; printf "OK\t%s\n" "$zip"; else printf "ERR\t%s\n" "$rc"; fi' ) output = str(_rt_execute(c, "execute.capture", "sh", "-c", script, "pytorrent-stage-zip", list_path, zip_path, list_data) or "").strip() parts = (output.splitlines()[0] if output else "").split("\t", 1) if len(parts) == 2 and parts[0] == "OK": return parts[1] raise RuntimeError(output or "Cannot create ZIP through rTorrent") def _remote_remove_staged(profile: dict, path: str) -> None: clean = str(path or "") tmp_prefix = download_tmp_dir().rstrip("/") + "/pytorrent-download-" if not clean.startswith(tmp_prefix): return try: _rt_execute(client_for(profile), "execute.throw", "rm", "-f", clean) except Exception: pass def torrent_staged_file_path(profile: dict, torrent_hash: str, index: int) -> dict: c = client_for(profile) selected, remote_path = _torrent_file_remote_path(profile, torrent_hash, index) suffix = LocalPath(str(selected.get("path") or "file")).suffix staged = _remote_stage_path(c, remote_path, suffix) return {**selected, "remote_path": remote_path, "staged_path": staged, "download_name": LocalPath(str(selected.get("path") or staged)).name} def torrent_staged_zip_path(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> dict: c = client_for(profile) files = torrent_files(profile, torrent_hash) wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files} items = [] for item in files: if int(item.get("index", -1)) not in wanted: continue _, remote_path = _torrent_file_remote_path(profile, torrent_hash, int(item["index"])) items.append({**item, "remote_path": remote_path}) staged = _remote_stage_zip(c, items) return {"staged_path": staged, "count": len(items)} def _torrent_raw_from_method(c: ScgiRtorrentClient, torrent_hash: str) -> bytes | None: for method in ("d.get_metafile", "d.metafile"): try: value = c.call(method, torrent_hash) except Exception: continue if hasattr(value, "data"): data = value.data elif isinstance(value, bytes): data = value elif isinstance(value, str): data = value.encode("latin-1", "ignore") else: data = None if data: return bytes(data) return None def _torrent_source_file(c: ScgiRtorrentClient, torrent_hash: str) -> str: for method in ("d.tied_to_file", "d.get_tied_to_file", "d.loaded_file", "d.get_loaded_file", "d.session_file", "d.get_session_file"): try: value = str(c.call(method, torrent_hash) or "").strip() except Exception: continue if value: return value return "" def export_torrent_file(profile: dict, torrent_hash: str) -> dict: c = client_for(profile) name = str(c.call("d.name", torrent_hash) or torrent_hash).strip() or torrent_hash filename = f"{name}.torrent" if not name.lower().endswith(".torrent") else name raw = _torrent_raw_from_method(c, torrent_hash) if raw: target = LocalPath(download_tmp_dir()) / f"pytorrent-download-{uuid.uuid4().hex}.torrent" target.write_bytes(raw) return {"path": str(target), "download_name": filename, "local": True} source = _torrent_source_file(c, torrent_hash) if not source: raise RuntimeError("Cannot find torrent source file in rTorrent") staged = _remote_stage_path(c, source, ".torrent") return {"path": staged, "download_name": filename, "local": False} def set_file_priorities(profile: dict, torrent_hash: str, files: list[dict]) -> dict: """Set rTorrent file priorities for one torrent. Note: Keeps the existing /files/priority API behavior and returns per-file errors instead of failing the whole batch on one invalid item. """ c = client_for(profile) updated = [] errors = [] for item in files or []: try: index = int(item.get("index")) priority = int(item.get("priority")) if priority < 0 or priority > 3: raise ValueError("Priority must be between 0 and 3") target = f"{torrent_hash}:f{index}" c.call("f.priority.set", target, priority) updated.append({"index": index, "priority": priority}) except Exception as exc: errors.append({"item": item, "error": str(exc)}) return {"updated": updated, "errors": errors} def set_folder_priority(profile: dict, torrent_hash: str, folder_path: str, priority: int) -> dict: # Note: Folder priority applies the same rTorrent file priority to every descendant path. folder = str(folder_path or "").strip().strip("/") updates = [] for item in torrent_files(profile, torrent_hash): path = str(item.get("path") or "").strip("/") if not folder or path == folder or path.startswith(folder + "/"): updates.append({"index": item["index"], "priority": int(priority)}) if not updates: return {"updated": [], "errors": [{"folder": folder_path, "error": "No files matched folder"}]} return set_file_priorities(profile, torrent_hash, updates) def torrent_local_file_path(profile: dict, torrent_hash: str, index: int) -> str: c = client_for(profile) files = torrent_files(profile, torrent_hash) selected = next((f for f in files if int(f.get("index", -1)) == int(index)), None) if not selected: raise ValueError("File index not found") base = _remote_clean_path(_torrent_data_path(c, torrent_hash)) rel = str(selected.get("path") or "").lstrip("/") if len(files) == 1 and base and not base.endswith("/"): path = base else: path = _remote_join(base, rel) # Note: HTTP file serving is enabled only for local profiles to avoid pretending remote files exist locally. if int(profile.get("is_remote") or 0): raise ValueError("HTTP file download is available only for local rTorrent profiles") local = LocalPath(path).resolve() if not local.exists() or not local.is_file(): raise FileNotFoundError(f"Local file is not available: {local}") return str(local) def torrent_local_file_paths(profile: dict, torrent_hash: str, indexes: list[int] | None = None) -> list[dict]: files = torrent_files(profile, torrent_hash) wanted = {int(x) for x in indexes} if indexes else {int(f["index"]) for f in files} out = [] for item in files: if int(item.get("index", -1)) not in wanted: continue out.append({**item, "local_path": torrent_local_file_path(profile, torrent_hash, int(item["index"]))}) return out # Note: Keep split module exports compatible with the previous single rtorrent.py module. __all__ = [ name for name in globals() if not name.startswith("__") and name not in {"annotations"} ]