From d55533d78ab579727e05656fa0afb7d6a28349da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Mon, 4 May 2026 20:12:26 +0200 Subject: [PATCH] bulk-part-jobs --- pytorrent/routes/api.py | 52 +++++++++++++++++++++++++++++++++++ pytorrent/routes/main.py | 2 +- pytorrent/services/workers.py | 3 ++ pytorrent/static/app.js | 10 +++++-- 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/pytorrent/routes/api.py b/pytorrent/routes/api.py index 5bc0ee3..95f14d2 100644 --- a/pytorrent/routes/api.py +++ b/pytorrent/routes/api.py @@ -24,6 +24,8 @@ from ..services.geoip import lookup_ip bp = Blueprint("api", __name__, url_prefix="/api") +MOVE_BULK_MAX_HASHES = 100 + def ok(payload=None): data = {"ok": True} @@ -303,6 +305,42 @@ def enrich_bulk_payload(profile: dict, action_name: str, data: dict) -> dict: return payload +def _chunk_hashes(hashes: list[str], size: int = MOVE_BULK_MAX_HASHES) -> list[list[str]]: + # Note: Splits very large torrent selections into predictable chunks so each queued job stays small and recoverable. + safe_size = max(1, int(size or MOVE_BULK_MAX_HASHES)) + return [hashes[index:index + safe_size] for index in range(0, len(hashes), safe_size)] + + +def enqueue_move_bulk_parts(profile: dict, data: dict) -> list[dict]: + # Note: Keeps the existing move action intact for normal batches, while large moves are queued as bulk-1, bulk-2, etc. + base_payload = enrich_bulk_payload(profile, "move", data) + hashes = base_payload.get("hashes") or [] + chunks = _chunk_hashes(hashes) + if len(chunks) <= 1: + job_id = enqueue("move", profile["id"], base_payload) + return [{"job_id": job_id, "label": "bulk-1", "part": 1, "parts": 1, "hashes": hashes, "hash_count": len(hashes)}] + + jobs = [] + items_by_hash = {str(item.get("hash")): item for item in (base_payload.get("job_context") or {}).get("items") or []} + for index, chunk in enumerate(chunks, start=1): + payload = dict(base_payload) + payload["hashes"] = chunk + context = dict(base_payload.get("job_context") or {}) + context.update({ + "bulk": True, + "bulk_label": f"bulk-{index}", + "bulk_part": index, + "bulk_parts": len(chunks), + "hash_count": len(chunk), + "parent_hash_count": len(hashes), + "items": [items_by_hash[h] for h in chunk if h in items_by_hash], + }) + payload["job_context"] = context + job_id = enqueue("move", profile["id"], payload) + jobs.append({"job_id": job_id, "label": context["bulk_label"], "part": index, "parts": len(chunks), "hashes": chunk, "hash_count": len(chunk)}) + return jobs + + @bp.get("/profiles") def profiles_list(): return ok({"profiles": preferences.list_profiles(), "active": preferences.active_profile()}) @@ -437,6 +475,20 @@ def torrent_action(action_name: str): allowed = {"start", "pause", "stop", "resume", "recheck", "reannounce", "remove", "move", "set_label", "set_ratio_group"} if action_name not in allowed: return jsonify({"ok": False, "error": "Unknown action"}), 400 + if action_name == "move": + # Note: Large move requests are split into ordered bulk parts; smaller requests keep the old single-job response shape. + jobs = enqueue_move_bulk_parts(profile, data) + first_job_id = jobs[0]["job_id"] if jobs else None + total_hashes = sum(int(job.get("hash_count") or 0) for job in jobs) + return ok({ + "job_id": first_job_id, + "job_ids": [job["job_id"] for job in jobs], + "jobs": jobs, + "hash_count": total_hashes, + "bulk": total_hashes > 1, + "bulk_parts": len(jobs), + "chunk_size": MOVE_BULK_MAX_HASHES, + }) payload = enrich_bulk_payload(profile, action_name, data) job_id = enqueue(action_name, profile["id"], payload) return ok({"job_id": job_id, "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1}) diff --git a/pytorrent/routes/main.py b/pytorrent/routes/main.py index 57c6e95..f9d8b9f 100644 --- a/pytorrent/routes/main.py +++ b/pytorrent/routes/main.py @@ -55,7 +55,7 @@ def openapi(): }, }, "/api/torrents": {"get": {"summary": "Get cached torrent snapshot", "responses": {"200": {"description": "Torrent list"}}}}, - "/api/torrents/{action_name}": {"post": {"summary": "Queue torrent action", "description": "For move, path is the target directory; move_data=true physically moves data on the rTorrent host using a detached shell move with status polling, force-overwrites an existing destination, tolerates rTorrent execute timeouts around mkdir/start/polling, handles retries after a partially completed move, avoids SCGI timeout on long mv operations, and recheck defaults to move_data. Move and remove jobs are ordered per profile, so a later remove waits for earlier move/remove jobs to finish.", "parameters": [{"name": "action_name", "in": "path", "required": True, "schema": {"type": "string", "enum": ["start", "pause", "stop", "resume", "recheck", "remove", "move", "set_label", "set_ratio_group"]}}], "requestBody": {"content": {"application/json": {"schema": {"type": "object", "properties": {"hashes": {"type": "array", "items": {"type": "string"}}, "path": {"type": "string", "description": "Target directory for move"}, "move_data": {"type": "boolean", "description": "Physically move data before setting torrent directory"}, "recheck": {"type": "boolean", "description": "Run hash check after physical move; defaults to move_data"}, "label": {"type": "string"}, "ratio_group": {"type": "string"}, "remove_data": {"type": "boolean"}}}}}}, "responses": {"200": {"description": "Job queued"}}}}, + "/api/torrents/{action_name}": {"post": {"summary": "Queue torrent action", "description": "For move, path is the target directory; move_data=true physically moves data on the rTorrent host using a detached shell move with status polling, force-overwrites an existing destination, tolerates rTorrent execute timeouts around mkdir/start/polling, handles retries after a partially completed move, avoids SCGI timeout on long mv operations, and recheck defaults to move_data. Large move selections are split into ordered bulk parts of up to 100 hashes. Move and remove jobs are ordered per profile, so a later remove waits for earlier move/remove jobs to finish.", "parameters": [{"name": "action_name", "in": "path", "required": True, "schema": {"type": "string", "enum": ["start", "pause", "stop", "resume", "recheck", "remove", "move", "set_label", "set_ratio_group"]}}], "requestBody": {"content": {"application/json": {"schema": {"type": "object", "properties": {"hashes": {"type": "array", "items": {"type": "string"}}, "path": {"type": "string", "description": "Target directory for move"}, "move_data": {"type": "boolean", "description": "Physically move data before setting torrent directory"}, "recheck": {"type": "boolean", "description": "Run hash check after physical move; defaults to move_data"}, "label": {"type": "string"}, "ratio_group": {"type": "string"}, "remove_data": {"type": "boolean"}}}}}}, "responses": {"200": {"description": "Job queued"}}}}, "/api/torrents/add": {"post": {"summary": "Add magnet links or torrent files", "requestBody": {"content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"uris": {"type": "string"}, "directory": {"type": "string"}, "label": {"type": "string"}, "start": {"type": "boolean"}, "files": {"type": "array", "items": {"type": "string", "format": "binary"}}}}}, "application/json": {"schema": {"type": "object"}}}}, "responses": {"200": {"description": "Jobs queued"}}}}, "/api/torrents/{torrent_hash}/files": {"get": {"summary": "Torrent files", "parameters": [{"name": "torrent_hash", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Files"}}}}, "/api/torrents/{torrent_hash}/peers": {"get": {"summary": "Torrent peers with GeoIP", "parameters": [{"name": "torrent_hash", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Peers"}}}}, diff --git a/pytorrent/services/workers.py b/pytorrent/services/workers.py index 1c9711f..560da78 100644 --- a/pytorrent/services/workers.py +++ b/pytorrent/services/workers.py @@ -189,6 +189,9 @@ def _job_summary(row: dict, payload: dict, result: dict) -> str: ctx = payload.get("job_context") or {} count = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0) parts = [] + if ctx.get("bulk_label"): + # Note: Shows which generated bulk part is being displayed in the job queue. + parts.append(f"{ctx.get('bulk_label')} of {ctx.get('bulk_parts')}") if count: parts.append(("bulk " if count > 1 else "single ") + f"{count} torrent(s)") if ctx.get("target_path"): diff --git a/pytorrent/static/app.js b/pytorrent/static/app.js index 4fff591..51ae53e 100644 --- a/pytorrent/static/app.js +++ b/pytorrent/static/app.js @@ -179,6 +179,12 @@ [...new Set(hashes||[])].filter(Boolean).forEach(hash=>activeOperations.set(hash,{action,jobId,state,label,updatedAt:Date.now()})); scheduleRender(true); } + function markQueuedJobs(response, fallbackHashes, action){ + // Note: Supports API responses that split one large user action into multiple queued bulk parts. + const jobs=Array.isArray(response?.jobs)?response.jobs:[]; + if(jobs.length){ jobs.forEach(job=>markTorrentOperation(job.hashes||[],action,job.job_id,'queued')); return; } + markTorrentOperation(fallbackHashes,action,response?.job_id,'queued'); + } function clearJobOperation(jobId, hashes=[]){ if(jobId){ [...activeOperations].forEach(([hash,op])=>{ if(op.jobId===jobId) activeOperations.delete(hash); }); } (hashes||[]).forEach(hash=>activeOperations.delete(hash)); @@ -211,7 +217,7 @@ function setSelectionRange(hash, keepExisting=false){ const current=visibleRows.findIndex(t=>t.hash===hash); const last=visibleRows.findIndex(t=>t.hash===lastSelectedHash); if(current<0 || last<0){ selected.add(hash); lastSelectedHash=hash; return; } if(!keepExisting) selected.clear(); const a=Math.min(current,last), b=Math.max(current,last); visibleRows.slice(a,b+1).forEach(t=>selected.add(t.hash)); selectedHash=hash; } async function post(url,data,method='POST'){ const res=await fetch(url,{method,headers:{'Content-Type':'application/json'},body:JSON.stringify(data||{})}); const json=await res.json(); if(!json.ok) throw new Error(json.error||'Operation failed'); return json; } - async function runAction(action, extra={}){ const hashes=selectedHashes(); if(!hashes.length) return toast('No torrents selected','warning'); let payload={hashes,...extra}; if(action==='move'){ openPathPicker('move'); return; } setBusy(true); try{ const j=await post(`/api/torrents/${action}`,payload); markTorrentOperation(hashes, action, j.job_id, 'queued'); if(action==='recheck'){ hashes.forEach(h=>{ const t=torrents.get(h); if(t) torrents.set(h,{...t,status:'Checking',hashing:1,message:'Force recheck queued'}); }); scheduleRender(true); } toast(`${action} queued`,'success'); if(action==='set_label') await loadLabels(); }catch(e){toast(e.message,'danger');} finally{setBusy(false);} } + async function runAction(action, extra={}){ const hashes=selectedHashes(); if(!hashes.length) return toast('No torrents selected','warning'); let payload={hashes,...extra}; if(action==='move'){ openPathPicker('move'); return; } setBusy(true); try{ const j=await post(`/api/torrents/${action}`,payload); markQueuedJobs(j, hashes, action); if(action==='recheck'){ hashes.forEach(h=>{ const t=torrents.get(h); if(t) torrents.set(h,{...t,status:'Checking',hashing:1,message:'Force recheck queued'}); }); scheduleRender(true); } toast(`${action} queued`,'success'); if(action==='set_label') await loadLabels(); }catch(e){toast(e.message,'danger');} finally{setBusy(false);} } function flag(iso){ const code=String(iso||'').toLowerCase(); return code?` ${esc(code.toUpperCase())}`:'-'; } function table(headers,rows){ return `${headers.map(h=>``).join('')}${rows.map(r=>`${r.map(c=>``).join('')}`).join('')}
${esc(h)}
${c}
`; } function renderGeneral(){ const t=torrents.get(selectedHash); const labels=t?labelNames(t.label).map(l=>` ${esc(l)}`).join(' '):''; $('detailPane').innerHTML=t?`
Name${esc(t.name)}
Hash${esc(t.hash)}
Path${esc(t.path)}
Size${esc(t.size_h)}
Progress${esc(t.progress)}%
Ratio${esc(t.ratio)}
Downloaded${esc(t.down_total_h)}
Uploaded${esc(t.up_total_h)}
Labels${labels||'-'}
Ratio group${esc(t.ratio_group||'')}
`:'Select a torrent.'; } @@ -305,7 +311,7 @@ async function applyDefaultDownloadPath(force=false){ const p=await getDefaultDownloadPath(); ['addPath','rssPath','autoEffectPath'].forEach(id=>{ const el=$(id); if(el && (force || !el.value)) el.value=p; }); return p; } async function openPathPicker(target){ pathTarget=target; const def=await getDefaultDownloadPath(); const initial=def || ($(target)?.value||'/'); $('moveOptions')?.classList.toggle('d-none', target!=='move'); if($('moveDataPhysical')) $('moveDataPhysical').checked=true; if($('moveRecheck')) $('moveRecheck').checked=true; new bootstrap.Modal($('pathModal')).show(); browsePath(initial); } async function browsePath(path){ $('pathList').innerHTML=' Loading...'; try{ const res=await fetch(`/api/path/browse?path=${encodeURIComponent(path||'/')}`); const j=await res.json(); if(!j.ok) throw new Error(j.error); $('pathCurrent').value=j.path; lastPathParent=j.parent; $('pathList').innerHTML=j.dirs.map(d=>`
${esc(d.name)}
`).join('')||'
No directories.
'; }catch(e){$('pathList').innerHTML=`
${esc(e.message)}
`;} } - $('pathList')?.addEventListener('click',e=>{const r=e.target.closest('.path-row'); if(r) browsePath(r.dataset.path);}); $('pathGoBtn')?.addEventListener('click',()=>browsePath($('pathCurrent').value)); $('pathUpBtn')?.addEventListener('click',()=>browsePath(lastPathParent)); $('pathReloadBtn')?.addEventListener('click',()=>browsePath($('pathCurrent').value)); $('pathSelectBtn')?.addEventListener('click',async()=>{const p=$('pathCurrent').value; if(pathTarget==='move'){ const hashes=selectedHashes(); const j=await post('/api/torrents/move',{hashes,path:p,move_data:!!($('moveDataPhysical')?.checked),recheck:!!($('moveRecheck')?.checked)}); markTorrentOperation(hashes,'move',j.job_id,'queued'); toast($('moveDataPhysical')?.checked?'physical move queued':'move queued','success'); } else if($(pathTarget)) $(pathTarget).value=p; bootstrap.Modal.getInstance($('pathModal'))?.hide();}); document.querySelectorAll('.browse-path').forEach(b=>b.addEventListener('click',()=>openPathPicker(b.dataset.target))); + $('pathList')?.addEventListener('click',e=>{const r=e.target.closest('.path-row'); if(r) browsePath(r.dataset.path);}); $('pathGoBtn')?.addEventListener('click',()=>browsePath($('pathCurrent').value)); $('pathUpBtn')?.addEventListener('click',()=>browsePath(lastPathParent)); $('pathReloadBtn')?.addEventListener('click',()=>browsePath($('pathCurrent').value)); $('pathSelectBtn')?.addEventListener('click',async()=>{const p=$('pathCurrent').value; if(pathTarget==='move'){ const hashes=selectedHashes(); const j=await post('/api/torrents/move',{hashes,path:p,move_data:!!($('moveDataPhysical')?.checked),recheck:!!($('moveRecheck')?.checked)}); markQueuedJobs(j,hashes,'move'); const parts=Number(j.bulk_parts||1); toast(parts>1?`move queued in ${parts} bulk parts`:$('moveDataPhysical')?.checked?'physical move queued':'move queued','success'); } else if($(pathTarget)) $(pathTarget).value=p; bootstrap.Modal.getInstance($('pathModal'))?.hide();}); document.querySelectorAll('.browse-path').forEach(b=>b.addEventListener('click',()=>openPathPicker(b.dataset.target))); function renderColumnManager(){ const box=$('columnManager'); if(!box) return; box.innerHTML=COLUMN_DEFS.map(([key,label])=>``).join(''); } $('saveColumnsBtn')?.addEventListener('click',async()=>{ document.querySelectorAll('.column-toggle').forEach(cb=>cb.checked?hiddenColumns.delete(cb.dataset.colKey):hiddenColumns.add(cb.dataset.colKey)); applyColumnVisibility(); scheduleRender(true); await post('/api/preferences',{table_columns_json:JSON.stringify({hidden:[...hiddenColumns]})}).catch(e=>toast(e.message,'danger')); toast('Columns saved','success'); });