diff --git a/pytorrent/routes/api.py b/pytorrent/routes/api.py index 5bc0ee3..95f14d2 100644 --- a/pytorrent/routes/api.py +++ b/pytorrent/routes/api.py @@ -24,6 +24,8 @@ from ..services.geoip import lookup_ip bp = Blueprint("api", __name__, url_prefix="/api") +MOVE_BULK_MAX_HASHES = 100 + def ok(payload=None): data = {"ok": True} @@ -303,6 +305,42 @@ def enrich_bulk_payload(profile: dict, action_name: str, data: dict) -> dict: return payload +def _chunk_hashes(hashes: list[str], size: int = MOVE_BULK_MAX_HASHES) -> list[list[str]]: + # Note: Splits very large torrent selections into predictable chunks so each queued job stays small and recoverable. + safe_size = max(1, int(size or MOVE_BULK_MAX_HASHES)) + return [hashes[index:index + safe_size] for index in range(0, len(hashes), safe_size)] + + +def enqueue_move_bulk_parts(profile: dict, data: dict) -> list[dict]: + # Note: Keeps the existing move action intact for normal batches, while large moves are queued as bulk-1, bulk-2, etc. + base_payload = enrich_bulk_payload(profile, "move", data) + hashes = base_payload.get("hashes") or [] + chunks = _chunk_hashes(hashes) + if len(chunks) <= 1: + job_id = enqueue("move", profile["id"], base_payload) + return [{"job_id": job_id, "label": "bulk-1", "part": 1, "parts": 1, "hashes": hashes, "hash_count": len(hashes)}] + + jobs = [] + items_by_hash = {str(item.get("hash")): item for item in (base_payload.get("job_context") or {}).get("items") or []} + for index, chunk in enumerate(chunks, start=1): + payload = dict(base_payload) + payload["hashes"] = chunk + context = dict(base_payload.get("job_context") or {}) + context.update({ + "bulk": True, + "bulk_label": f"bulk-{index}", + "bulk_part": index, + "bulk_parts": len(chunks), + "hash_count": len(chunk), + "parent_hash_count": len(hashes), + "items": [items_by_hash[h] for h in chunk if h in items_by_hash], + }) + payload["job_context"] = context + job_id = enqueue("move", profile["id"], payload) + jobs.append({"job_id": job_id, "label": context["bulk_label"], "part": index, "parts": len(chunks), "hashes": chunk, "hash_count": len(chunk)}) + return jobs + + @bp.get("/profiles") def profiles_list(): return ok({"profiles": preferences.list_profiles(), "active": preferences.active_profile()}) @@ -437,6 +475,20 @@ def torrent_action(action_name: str): allowed = {"start", "pause", "stop", "resume", "recheck", "reannounce", "remove", "move", "set_label", "set_ratio_group"} if action_name not in allowed: return jsonify({"ok": False, "error": "Unknown action"}), 400 + if action_name == "move": + # Note: Large move requests are split into ordered bulk parts; smaller requests keep the old single-job response shape. + jobs = enqueue_move_bulk_parts(profile, data) + first_job_id = jobs[0]["job_id"] if jobs else None + total_hashes = sum(int(job.get("hash_count") or 0) for job in jobs) + return ok({ + "job_id": first_job_id, + "job_ids": [job["job_id"] for job in jobs], + "jobs": jobs, + "hash_count": total_hashes, + "bulk": total_hashes > 1, + "bulk_parts": len(jobs), + "chunk_size": MOVE_BULK_MAX_HASHES, + }) payload = enrich_bulk_payload(profile, action_name, data) job_id = enqueue(action_name, profile["id"], payload) return ok({"job_id": job_id, "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1}) diff --git a/pytorrent/routes/main.py b/pytorrent/routes/main.py index 57c6e95..f9d8b9f 100644 --- a/pytorrent/routes/main.py +++ b/pytorrent/routes/main.py @@ -55,7 +55,7 @@ def openapi(): }, }, "/api/torrents": {"get": {"summary": "Get cached torrent snapshot", "responses": {"200": {"description": "Torrent list"}}}}, - "/api/torrents/{action_name}": {"post": {"summary": "Queue torrent action", "description": "For move, path is the target directory; move_data=true physically moves data on the rTorrent host using a detached shell move with status polling, force-overwrites an existing destination, tolerates rTorrent execute timeouts around mkdir/start/polling, handles retries after a partially completed move, avoids SCGI timeout on long mv operations, and recheck defaults to move_data. Move and remove jobs are ordered per profile, so a later remove waits for earlier move/remove jobs to finish.", "parameters": [{"name": "action_name", "in": "path", "required": True, "schema": {"type": "string", "enum": ["start", "pause", "stop", "resume", "recheck", "remove", "move", "set_label", "set_ratio_group"]}}], "requestBody": {"content": {"application/json": {"schema": {"type": "object", "properties": {"hashes": {"type": "array", "items": {"type": "string"}}, "path": {"type": "string", "description": "Target directory for move"}, "move_data": {"type": "boolean", "description": "Physically move data before setting torrent directory"}, "recheck": {"type": "boolean", "description": "Run hash check after physical move; defaults to move_data"}, "label": {"type": "string"}, "ratio_group": {"type": "string"}, "remove_data": {"type": "boolean"}}}}}}, "responses": {"200": {"description": "Job queued"}}}}, + "/api/torrents/{action_name}": {"post": {"summary": "Queue torrent action", "description": "For move, path is the target directory; move_data=true physically moves data on the rTorrent host using a detached shell move with status polling, force-overwrites an existing destination, tolerates rTorrent execute timeouts around mkdir/start/polling, handles retries after a partially completed move, avoids SCGI timeout on long mv operations, and recheck defaults to move_data. Large move selections are split into ordered bulk parts of up to 100 hashes. Move and remove jobs are ordered per profile, so a later remove waits for earlier move/remove jobs to finish.", "parameters": [{"name": "action_name", "in": "path", "required": True, "schema": {"type": "string", "enum": ["start", "pause", "stop", "resume", "recheck", "remove", "move", "set_label", "set_ratio_group"]}}], "requestBody": {"content": {"application/json": {"schema": {"type": "object", "properties": {"hashes": {"type": "array", "items": {"type": "string"}}, "path": {"type": "string", "description": "Target directory for move"}, "move_data": {"type": "boolean", "description": "Physically move data before setting torrent directory"}, "recheck": {"type": "boolean", "description": "Run hash check after physical move; defaults to move_data"}, "label": {"type": "string"}, "ratio_group": {"type": "string"}, "remove_data": {"type": "boolean"}}}}}}, "responses": {"200": {"description": "Job queued"}}}}, "/api/torrents/add": {"post": {"summary": "Add magnet links or torrent files", "requestBody": {"content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"uris": {"type": "string"}, "directory": {"type": "string"}, "label": {"type": "string"}, "start": {"type": "boolean"}, "files": {"type": "array", "items": {"type": "string", "format": "binary"}}}}}, "application/json": {"schema": {"type": "object"}}}}, "responses": {"200": {"description": "Jobs queued"}}}}, "/api/torrents/{torrent_hash}/files": {"get": {"summary": "Torrent files", "parameters": [{"name": "torrent_hash", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Files"}}}}, "/api/torrents/{torrent_hash}/peers": {"get": {"summary": "Torrent peers with GeoIP", "parameters": [{"name": "torrent_hash", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Peers"}}}}, diff --git a/pytorrent/services/workers.py b/pytorrent/services/workers.py index 1c9711f..560da78 100644 --- a/pytorrent/services/workers.py +++ b/pytorrent/services/workers.py @@ -189,6 +189,9 @@ def _job_summary(row: dict, payload: dict, result: dict) -> str: ctx = payload.get("job_context") or {} count = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0) parts = [] + if ctx.get("bulk_label"): + # Note: Shows which generated bulk part is being displayed in the job queue. + parts.append(f"{ctx.get('bulk_label')} of {ctx.get('bulk_parts')}") if count: parts.append(("bulk " if count > 1 else "single ") + f"{count} torrent(s)") if ctx.get("target_path"): diff --git a/pytorrent/static/app.js b/pytorrent/static/app.js index 4fff591..51ae53e 100644 --- a/pytorrent/static/app.js +++ b/pytorrent/static/app.js @@ -179,6 +179,12 @@ [...new Set(hashes||[])].filter(Boolean).forEach(hash=>activeOperations.set(hash,{action,jobId,state,label,updatedAt:Date.now()})); scheduleRender(true); } + function markQueuedJobs(response, fallbackHashes, action){ + // Note: Supports API responses that split one large user action into multiple queued bulk parts. + const jobs=Array.isArray(response?.jobs)?response.jobs:[]; + if(jobs.length){ jobs.forEach(job=>markTorrentOperation(job.hashes||[],action,job.job_id,'queued')); return; } + markTorrentOperation(fallbackHashes,action,response?.job_id,'queued'); + } function clearJobOperation(jobId, hashes=[]){ if(jobId){ [...activeOperations].forEach(([hash,op])=>{ if(op.jobId===jobId) activeOperations.delete(hash); }); } (hashes||[]).forEach(hash=>activeOperations.delete(hash)); @@ -211,7 +217,7 @@ function setSelectionRange(hash, keepExisting=false){ const current=visibleRows.findIndex(t=>t.hash===hash); const last=visibleRows.findIndex(t=>t.hash===lastSelectedHash); if(current<0 || last<0){ selected.add(hash); lastSelectedHash=hash; return; } if(!keepExisting) selected.clear(); const a=Math.min(current,last), b=Math.max(current,last); visibleRows.slice(a,b+1).forEach(t=>selected.add(t.hash)); selectedHash=hash; } async function post(url,data,method='POST'){ const res=await fetch(url,{method,headers:{'Content-Type':'application/json'},body:JSON.stringify(data||{})}); const json=await res.json(); if(!json.ok) throw new Error(json.error||'Operation failed'); return json; } - async function runAction(action, extra={}){ const hashes=selectedHashes(); if(!hashes.length) return toast('No torrents selected','warning'); let payload={hashes,...extra}; if(action==='move'){ openPathPicker('move'); return; } setBusy(true); try{ const j=await post(`/api/torrents/${action}`,payload); markTorrentOperation(hashes, action, j.job_id, 'queued'); if(action==='recheck'){ hashes.forEach(h=>{ const t=torrents.get(h); if(t) torrents.set(h,{...t,status:'Checking',hashing:1,message:'Force recheck queued'}); }); scheduleRender(true); } toast(`${action} queued`,'success'); if(action==='set_label') await loadLabels(); }catch(e){toast(e.message,'danger');} finally{setBusy(false);} } + async function runAction(action, extra={}){ const hashes=selectedHashes(); if(!hashes.length) return toast('No torrents selected','warning'); let payload={hashes,...extra}; if(action==='move'){ openPathPicker('move'); return; } setBusy(true); try{ const j=await post(`/api/torrents/${action}`,payload); markQueuedJobs(j, hashes, action); if(action==='recheck'){ hashes.forEach(h=>{ const t=torrents.get(h); if(t) torrents.set(h,{...t,status:'Checking',hashing:1,message:'Force recheck queued'}); }); scheduleRender(true); } toast(`${action} queued`,'success'); if(action==='set_label') await loadLabels(); }catch(e){toast(e.message,'danger');} finally{setBusy(false);} } function flag(iso){ const code=String(iso||'').toLowerCase(); return code?` ${esc(code.toUpperCase())}`:'-'; } function table(headers,rows){ return `
| ${esc(h)} | `).join('')}
|---|
| ${c} | `).join('')}