from __future__ import annotations from ._shared import * from ..services import pdf_preview_links, torrent_creator from ..services.reverse_dns import attach_reverse_dns @bp.get("/torrents") def torrents(): profile = preferences.active_profile() if not profile: return ok({"torrents": [], "summary": cached_summary(0, []), "error": "No rTorrent profile"}) rows = torrent_cache.snapshot(profile["id"]) return ok({ "profile_id": profile["id"], "torrents": rows, "summary": cached_summary(profile["id"], rows), "error": torrent_cache.error(profile["id"]), }) @bp.get("/trackers/summary") def trackers_summary(): profile = preferences.active_profile() if not profile: return ok({"summary": {"hashes": {}, "trackers": [], "errors": [], "scanned": 0, "pending": 0}, "error": "No profile"}) try: # Note: Tracker summary returns cached data immediately; optional warmup scans rTorrent in the background for very large libraries. scan_limit = min(250, max(0, int(request.args.get("scan_limit") or 0))) bg_limit = min(250, max(1, int(request.args.get("bg_limit") or 80))) warm = str(request.args.get("warm") or "").lower() in {"1", "true", "yes"} hashes = [t.get("hash") for t in torrent_cache.snapshot(profile["id"]) if t.get("hash")] prefs = preferences.get_preferences() include_favicons = bool(prefs and prefs.get("tracker_favicons_enabled")) loader = lambda h: rtorrent.torrent_trackers(profile, h) summary = tracker_cache.summary(profile, hashes, loader, scan_limit=scan_limit, include_favicons=include_favicons) if warm and int(summary.get("pending") or 0) > 0: summary["warming"] = tracker_cache.warm_summary_cache(profile, hashes, loader, batch_size=bg_limit) return ok({"summary": summary}) except Exception as exc: return ok({"summary": {"hashes": {}, "trackers": [], "errors": [{"error": str(exc)}], "scanned": 0, "pending": 0}, "error": str(exc)}) @bp.get("/trackers/favicon/") @bp.get("/tracker-favicon/") def tracker_favicon(domain: str): prefs = preferences.get_preferences() force = str(request.args.get("refresh") or "").lower() in {"1", "true", "yes", "force"} # Note: Manual refresh must work from CLI even when tracker favicons are disabled in Preferences. enabled = force or bool(prefs and prefs.get("tracker_favicons_enabled")) static_url = tracker_cache.favicon_public_url(domain, enabled=enabled, create=True, force=force) if static_url: # Note: The API only discovers/cache-warms the icon; the browser receives the file from /static/tracker_favicons/. return redirect(static_url, code=302) cached = tracker_cache.favicon_cache_row(domain) return jsonify({ "ok": False, "error": "favicon not found", "domain": tracker_cache.tracker_domain(domain), "enabled": bool(enabled), "cached_error": (cached or {}).get("error") if cached else None, }), 404 @bp.get("/trackers/favicon") def tracker_favicon_query(): # Note: Query-string alias makes cache warming easier from shell scripts where path routing/proxies may differ. domain = str(request.args.get("domain") or "").strip() if not domain: return jsonify({"ok": False, "error": "domain is required"}), 400 return tracker_favicon(domain) @bp.get("/torrent-stats") def torrent_stats_get(): profile = preferences.active_profile() if not profile: return ok({"stats": {}, "error": "No profile"}) force = str(request.args.get("force") or "").lower() in {"1", "true", "yes"} try: # Note: Heavy file metadata is served from a 15-minute DB cache unless the user explicitly refreshes it. return ok({"stats": torrent_stats.get(profile, force=force)}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 500 @bp.get("/torrents//files") def torrent_files(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 return ok({"files": rtorrent.torrent_files(profile, torrent_hash)}) @bp.get("/torrents//files//mediainfo") def torrent_file_media_info(torrent_hash: str, file_index: int): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: # Note: The route is additive and keeps all existing file endpoints unchanged. media_info = rtorrent.torrent_file_media_info(profile, torrent_hash, file_index) if media_info.get("kind") == "pdf": link = pdf_preview_links.create_pdf_preview_link( torrent_hash, file_index, int(profile.get("id") or 0), int(default_user_id() or 0), ) # Note: The frontend receives an in-app temporary URL instead of exposing the API download endpoint in the new-tab action. media_info["preview_url"] = url_for("main.pdf_preview", token=link["token"]) media_info["preview_expires_in"] = link["expires_in"] return ok({"media_info": media_info}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/torrents//files/priority") def torrent_file_priority(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} files = data.get("files") or [] if not isinstance(files, list) or not files: return jsonify({"ok": False, "error": "No files selected"}), 400 result = rtorrent.set_file_priorities(profile, torrent_hash, files) status = 207 if result.get("errors") else 200 return ok(result), status @bp.get("/torrents//files/tree") def torrent_file_tree(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 return ok({"tree": rtorrent.torrent_file_tree(profile, torrent_hash)}) @bp.post("/torrents//files/folder-priority") def torrent_folder_priority(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} result = rtorrent.set_folder_priority(profile, torrent_hash, str(data.get("path") or ""), int(data.get("priority") or 0)) status = 207 if result.get("errors") else 200 return ok(result), status def _attachment_headers(download_name: str, content_type: str = "application/octet-stream", disposition: str = "attachment") -> dict: safe = Path(download_name or "download.bin").name or "download.bin" safe_disposition = "inline" if disposition == "inline" else "attachment" return { "Content-Type": content_type, "Content-Disposition": f"{safe_disposition}; filename*=UTF-8''{quote(safe)}", "X-Content-Type-Options": "nosniff", } def _cleanup_staged_file(profile: dict, path: str, local: bool = False) -> None: if local: try: Path(path).unlink() except Exception: pass return rtorrent._remote_remove_staged(profile, path) try: tmp_prefix = str(PYTORRENT_TMP_DIR).rstrip("/") + "/pytorrent-download-" if str(path).startswith(tmp_prefix) and Path(path).exists(): Path(path).unlink() except Exception: pass def _read_staged_file(profile: dict, path: str, local: bool = False) -> bytes: if local: return Path(path).read_bytes() chunks = [] for chunk in rtorrent.iter_remote_file_chunks(profile, path): if chunk: chunks.append(bytes(chunk)) return b"".join(chunks) def _send_staged_file(profile: dict, path: str, download_name: str, local: bool = False): headers = _attachment_headers(download_name, "application/x-bittorrent") if local: data = Path(path).read_bytes() _cleanup_staged_file(profile, path, local=True) headers["Content-Length"] = str(len(data)) return Response(data, headers=headers) def generate(): try: yield from rtorrent.iter_remote_file_chunks(profile, path) finally: _cleanup_staged_file(profile, path, local=False) return Response(stream_with_context(generate()), headers=headers, direct_passthrough=True) @bp.post("/torrents//files//download-link") def torrent_file_download_link(torrent_hash: str, file_index: int): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: # Note: The API validates the file selection before returning a short-lived in-app /download URL to the UI. rtorrent.torrent_download_file_info(profile, torrent_hash, file_index) link = pdf_preview_links.create_file_download_link(torrent_hash, file_index, int(profile.get("id") or 0), int(default_user_id() or 0)) return ok({"url": url_for("main.temporary_download", token=link["token"]), "expires_in": link["expires_in"]}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/torrents//files/download-link") def torrent_file_download_link_from_body(torrent_hash: str): data = request.get_json(silent=True) or {} try: file_index = int(data.get("file_index")) except Exception: return jsonify({"ok": False, "error": "file_index is required"}), 400 return torrent_file_download_link(torrent_hash, file_index) @bp.post("/torrents//files/download.zip/link") def torrent_files_download_zip_link(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} try: indexes = data.get("indexes") or None # Note: ZIP link creation validates the requested files through the same service used by the direct download endpoint. rtorrent.torrent_download_zip_items(profile, torrent_hash, indexes) link = pdf_preview_links.create_file_zip_download_link(torrent_hash, indexes, int(profile.get("id") or 0), int(default_user_id() or 0)) return ok({"url": url_for("main.temporary_download", token=link["token"]), "expires_in": link["expires_in"]}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.get("/torrents//torrent-file/link") def torrent_file_export_link(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: # Note: Export availability is checked before the UI receives a temporary /download URL. item = rtorrent.export_torrent_file(profile, torrent_hash) _cleanup_staged_file(profile, item["path"], bool(item.get("local"))) link = pdf_preview_links.create_torrent_file_download_link(torrent_hash, int(profile.get("id") or 0), int(default_user_id() or 0)) return ok({"url": url_for("main.temporary_download", token=link["token"]), "expires_in": link["expires_in"]}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/torrents/torrent-files.zip/link") def torrent_files_export_zip_link(): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} hashes = [str(h) for h in (data.get("hashes") or []) if str(h).strip()] if not hashes: return jsonify({"ok": False, "error": "No torrents selected"}), 400 try: # Note: Each hash is checked before the temporary ZIP export link is returned to the UI. staged_paths = [] try: for h in hashes: item = rtorrent.export_torrent_file(profile, h) staged_paths.append((item["path"], bool(item.get("local")))) finally: for path, is_local in staged_paths: _cleanup_staged_file(profile, path, is_local) link = pdf_preview_links.create_torrent_files_zip_download_link(hashes, int(profile.get("id") or 0), int(default_user_id() or 0)) return ok({"url": url_for("main.temporary_download", token=link["token"]), "expires_in": link["expires_in"]}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.get("/torrents//files//download") def torrent_file_download(torrent_hash: str, file_index: int): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: item = rtorrent.torrent_download_file_info(profile, torrent_hash, file_index) size = int(item.get("size") or 0) download_name = item.get("download_name") or "file.bin" inline_pdf = str(request.args.get("disposition") or "").lower() == "inline" and Path(download_name).suffix.lower() == ".pdf" # Note: Inline mode is limited to PDFs so the existing download behavior remains unchanged for every other file type. headers = _attachment_headers(download_name, "application/pdf" if inline_pdf else "application/octet-stream", "inline" if inline_pdf else "attachment") if size > 0: headers["Content-Length"] = str(size) def generate(): yield from rtorrent.iter_remote_file_chunks(profile, item["remote_path"], size=size or None) return Response(stream_with_context(generate()), headers=headers, direct_passthrough=True) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 class _ZipStream: def __init__(self): self.queue: queue.Queue[bytes | None] = queue.Queue(maxsize=16) self.closed = False def write(self, data): if not data: return 0 payload = bytes(data) self.queue.put(payload) return len(payload) def flush(self): return None def close(self): if not self.closed: self.closed = True self.queue.put(None) def writable(self): return True def _safe_zip_name(name: str, fallback: str) -> str: value = str(name or fallback).replace("\\", "/").lstrip("/") parts = [part for part in value.split("/") if part not in ("", ".", "..")] return "/".join(parts) or fallback def _stream_torrent_files_zip(profile: dict, items: list[dict]): writer = _ZipStream() errors: list[BaseException] = [] def produce(): try: with zipfile.ZipFile(writer, "w", compression=zipfile.ZIP_STORED, allowZip64=True) as archive: used = set() for item in items: arcname = _safe_zip_name(str(item.get("path") or ""), f"file-{item.get('index', 0)}") base = arcname counter = 2 while arcname in used: stem = Path(base).stem or "file" suffix = Path(base).suffix parent = str(Path(base).parent).replace(".", "", 1).strip("/") candidate = f"{stem}-{counter}{suffix}" arcname = f"{parent}/{candidate}" if parent else candidate counter += 1 used.add(arcname) info = zipfile.ZipInfo(arcname) info.compress_type = zipfile.ZIP_STORED info.file_size = int(item.get("size") or 0) with archive.open(info, "w", force_zip64=True) as dest: for chunk in rtorrent.iter_remote_file_chunks(profile, item["remote_path"], size=int(item.get("size") or 0) or None): dest.write(chunk) except BaseException as exc: errors.append(exc) finally: writer.close() threading.Thread(target=produce, name="pytorrent-zip-stream", daemon=True).start() while True: chunk = writer.queue.get() if chunk is None: break yield chunk if errors: raise errors[0] @bp.post("/torrents//files/download.zip") def torrent_files_download_zip(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} try: items = rtorrent.torrent_download_zip_items(profile, torrent_hash, data.get("indexes") or None) headers = _attachment_headers(f"{torrent_hash[:12]}-files.zip", "application/zip") headers["X-PyTorrent-Download-Mode"] = "rtorrent-stream" return Response(stream_with_context(_stream_torrent_files_zip(profile, items)), headers=headers, direct_passthrough=True) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.get("/torrents//torrent-file") def torrent_file_export(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: item = rtorrent.export_torrent_file(profile, torrent_hash) return _send_staged_file(profile, item["path"], item["download_name"], bool(item.get("local"))) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/torrents/torrent-files.zip") def torrent_files_export_zip(): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} hashes = [str(h) for h in (data.get("hashes") or []) if str(h).strip()] if not hashes: return jsonify({"ok": False, "error": "No torrents selected"}), 400 staged_paths = [] PYTORRENT_TMP_DIR.mkdir(parents=True, exist_ok=True) tmp = tempfile.NamedTemporaryFile(prefix="pytorrent-torrents-", suffix=".zip", delete=False, dir=str(PYTORRENT_TMP_DIR)) tmp.close() try: with zipfile.ZipFile(tmp.name, "w", compression=zipfile.ZIP_DEFLATED, allowZip64=True) as archive: used_names = set() for h in hashes: item = rtorrent.export_torrent_file(profile, h) staged_paths.append((item["path"], bool(item.get("local")))) name = Path(item["download_name"]).name or f"{h}.torrent" base_name = name counter = 2 while name in used_names: stem = Path(base_name).stem name = f"{stem}-{counter}.torrent" counter += 1 used_names.add(name) archive.writestr(name, _read_staged_file(profile, item["path"], bool(item.get("local")))) response = send_file(tmp.name, as_attachment=True, download_name="pytorrent-torrents.zip") def cleanup(): for path, is_local in staged_paths: _cleanup_staged_file(profile, path, is_local) try: Path(tmp.name).unlink() except Exception: pass response.call_on_close(cleanup) return response except Exception as exc: for path, is_local in staged_paths: _cleanup_staged_file(profile, path, is_local) try: Path(tmp.name).unlink() except Exception: pass return jsonify({"ok": False, "error": str(exc)}), 400 @bp.get("/torrents//chunks") def torrent_chunks(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: max_cells = min(10000, max(64, int(request.args.get("max_cells") or 2048))) return ok({"chunks": rtorrent.torrent_chunks(profile, torrent_hash, max_cells=max_cells)}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/torrents//chunks/") def torrent_chunk_action(torrent_hash: str, action_name: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: # Note: Chunk actions are intentionally limited to rTorrent-safe operations; XML-RPC has no supported single-piece redownload call. result = rtorrent.torrent_chunk_action(profile, torrent_hash, action_name, request.get_json(silent=True) or {}) return ok({"result": result, "message": result.get("message") or f"Chunk action {action_name} done"}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.get("/torrents//peers") def torrent_peers(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 peers = rtorrent.torrent_peers(profile, torrent_hash) for peer in peers: peer.update(lookup_ip(peer.get("ip", ""))) prefs = preferences.get_preferences(profile_id=profile.get("id")) if int(prefs.get("reverse_dns_enabled") or 0): # Note: PTR hostnames are attached only when the user enables the lightweight cached resolver. attach_reverse_dns(peers) return ok({"peers": peers}) @bp.get("/torrents//trackers") def torrent_trackers(torrent_hash: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 return ok({"trackers": rtorrent.torrent_trackers(profile, torrent_hash)}) @bp.post("/torrents//trackers/") def torrent_tracker_action(torrent_hash: str, action_name: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 try: result = rtorrent.tracker_action(profile, torrent_hash, action_name, request.get_json(silent=True) or {}) return ok({"result": result, "message": f"Tracker {action_name} via {result.get('method', 'XMLRPC')}"}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/torrents/") def torrent_action(action_name: str): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} allowed = {"start", "pause", "unpause", "stop", "resume", "recheck", "reannounce", "remove", "move", "set_label", "set_ratio_group"} if action_name not in allowed: return jsonify({"ok": False, "error": "Unknown action"}), 400 if action_name in {"move", "remove"}: # Note: Large move/remove requests are split into ordered bulk parts; smaller requests keep the old single-job response shape. jobs = enqueue_bulk_parts(profile, action_name, data) first_job_id = jobs[0]["job_id"] if jobs else None total_hashes = sum(int(job.get("hash_count") or 0) for job in jobs) return ok({ "job_id": first_job_id, "job_ids": [job["job_id"] for job in jobs], "jobs": jobs, "hash_count": total_hashes, "bulk": total_hashes > 1, "bulk_parts": len(jobs), "chunk_size": MOVE_BULK_MAX_HASHES, }) payload = enrich_bulk_payload(profile, action_name, data) job_id = enqueue(action_name, profile["id"], payload) return ok({"job_id": job_id, "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1}) @bp.post("/torrents/create") def torrent_create(): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 form = request.form if request.content_type and request.content_type.startswith("multipart/form-data") else (request.get_json(silent=True) or {}) try: created = torrent_creator.build_torrent( source_path=form.get("source_path", ""), trackers=form.get("trackers", ""), comment=form.get("comment", ""), source=form.get("source", ""), piece_size_kib=form.get("piece_size_kib", 256), private=str(form.get("private", "0")).lower() in {"1", "true", "on", "yes"}, ) share = str(form.get("share", "0")).lower() in {"1", "true", "on", "yes"} if share: size_check = rtorrent.validate_torrent_upload_size(profile, created["data"], True, created["source_parent"], form.get("label", "")) if not size_check.get("ok"): return jsonify({"ok": False, "error": f"Created torrent is too large for the current rTorrent XML-RPC limit: request {size_check['request_h']} > limit {size_check['limit_h']}. Change {size_check['setting']}.set to e.g. {size_check['suggested_value']} in rTorrent settings.", "xmlrpc_limit": size_check}), 413 rtorrent.add_torrent_raw(profile, created["data"], True, created["source_parent"], form.get("label", "")) headers = _attachment_headers(created["filename"], "application/x-bittorrent") headers["Content-Length"] = str(len(created["data"])) headers["X-PyTorrent-Info-Hash"] = created["info_hash"] headers["X-PyTorrent-Create-Message"] = f"Created {created['filename']} ({created['file_count']} file(s))" return Response(created["data"], headers=headers) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/torrents/add") def torrent_add(): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 job_ids = [] if request.content_type and request.content_type.startswith("multipart/form-data"): start = request.form.get("start", "1") in {"1", "true", "on", "yes"} directory = request.form.get("directory", "") or active_default_download_path(profile) label = request.form.get("label", "") uris = [x.strip() for x in request.form.get("uris", "").splitlines() if x.strip()] for uri in uris: job_ids.append(enqueue("add_magnet", profile["id"], {"uri": uri, "start": start, "directory": directory, "label": label})) existing_hashes = {str(t.get("hash") or "").upper() for t in torrent_cache.snapshot(profile["id"])} try: priority_payload = json.loads(request.form.get("file_priorities") or "{}") except Exception: priority_payload = {} allow_duplicates = request.form.get("allow_duplicates", "0") in {"1", "true", "on", "yes"} skipped_duplicates = [] for uploaded in request.files.getlist("files"): raw = uploaded.read() meta = parse_torrent(raw) info_hash = str(meta.get("info_hash") or "").upper() filename = uploaded.filename or meta.get("name") or info_hash if info_hash and info_hash in existing_hashes and not allow_duplicates: skipped_duplicates.append({"filename": filename, "info_hash": info_hash}) continue file_priorities = [] if isinstance(priority_payload, dict): file_priorities = priority_payload.get(filename) or priority_payload.get(info_hash) or [] elif isinstance(priority_payload, list): file_priorities = priority_payload size_check = rtorrent.validate_torrent_upload_size(profile, raw, start, directory, label, file_priorities or None) if not size_check.get("ok"): return jsonify({ "ok": False, "error": ( f"Torrent file is too large for the current rTorrent XML-RPC limit: " f"request {size_check['request_h']} > limit {size_check['limit_h']}. " f"Change {size_check['setting']}.set to e.g. {size_check['suggested_value']} in rTorrent settings." ), "xmlrpc_limit": size_check, }), 413 data_b64 = base64.b64encode(raw).decode("ascii") job_ids.append(enqueue("add_torrent_raw", profile["id"], {"filename": filename, "data_b64": data_b64, "start": start, "directory": directory, "label": label, "file_priorities": file_priorities or None})) return ok({"job_ids": job_ids, "skipped_duplicates": skipped_duplicates}) data = request.get_json(silent=True) or {} uris = data.get("uris") or [] if isinstance(uris, str): uris = [x.strip() for x in uris.splitlines() if x.strip()] for uri in uris: job_ids.append(enqueue("add_magnet", profile["id"], {"uri": uri, "start": data.get("start", True), "directory": data.get("directory", "") or active_default_download_path(profile), "label": data.get("label", "")})) return ok({"job_ids": job_ids}) @bp.post("/torrents/preview") def torrent_preview(): profile = preferences.active_profile() existing_hashes = set() if profile: try: existing_hashes = {str(t.get("hash") or "").upper() for t in torrent_cache.snapshot(profile["id"])} except Exception: existing_hashes = set() previews = [] xmlrpc_limit = rtorrent.xmlrpc_size_limit(profile) if profile else None try: uploads = request.files.getlist("files") if request.content_type and request.content_type.startswith("multipart/form-data") else [] for uploaded in uploads: raw = uploaded.read() meta = parse_torrent(raw) meta["filename"] = uploaded.filename meta["duplicate"] = bool(meta.get("info_hash") and meta["info_hash"].upper() in existing_hashes) if profile: size_check = rtorrent.validate_torrent_upload_size(profile, raw) meta["xmlrpc_request_bytes"] = size_check["request_bytes"] meta["xmlrpc_request_h"] = size_check["request_h"] meta["xmlrpc_too_large"] = not size_check.get("ok") previews.append(meta) return ok({"previews": previews, "xmlrpc_limit": xmlrpc_limit}) except Exception as exc: return jsonify({"ok": False, "error": str(exc)}), 400 @bp.post("/speed/limits") def speed_limits(): profile = preferences.active_profile() if not profile: return jsonify({"ok": False, "error": "No profile"}), 400 data = request.get_json(silent=True) or {} job_id = enqueue("set_limits", profile["id"], {"down": data.get("down"), "up": data.get("up")}) return ok({"job_id": job_id}) def _user_disk_status(profile: dict) -> dict: # Note: Disk usage is user-preference aware, so it is read separately from the shared Socket.IO poller. prefs = preferences.get_disk_monitor_preferences(profile.get("id") if profile else None) try: paths = json.loads((prefs or {}).get("disk_monitor_paths_json") or "[]") if prefs else [] except Exception: paths = [] return rtorrent.disk_usage_for_paths( profile, paths, (prefs or {}).get("disk_monitor_mode") or "default", (prefs or {}).get("disk_monitor_selected_path") or "", )