From 1ff1525f0bb2fc505d0efe9180ed0d207c6a992f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Mon, 4 May 2026 21:08:30 +0200 Subject: [PATCH] bulk-part-jobs, and scgi retries --- .env.example | 1 + pytorrent/routes/api.py | 26 +++++--- pytorrent/services/rtorrent.py | 114 +++++++++++++++++++++++++++------ pytorrent/services/workers.py | 6 +- pytorrent/static/app.js | 6 +- 5 files changed, 120 insertions(+), 33 deletions(-) diff --git a/.env.example b/.env.example index 475123d..372ebc4 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,7 @@ PYTORRENT_POLL_INTERVAL=1.0 PYTORRENT_WORKERS=16 PYTORRENT_GEOIP_DB=data/GeoLite2-City.mmdb PYTORRENT_ALLOW_UNSAFE_WERKZEUG=0 +PYTORRENT_SCGI_RETRIES=8 # Retention / Smart Queue PYTORRENT_TRAFFIC_HISTORY_RETENTION_DAYS=90 diff --git a/pytorrent/routes/api.py b/pytorrent/routes/api.py index 95f14d2..1e131dc 100644 --- a/pytorrent/routes/api.py +++ b/pytorrent/routes/api.py @@ -311,13 +311,13 @@ def _chunk_hashes(hashes: list[str], size: int = MOVE_BULK_MAX_HASHES) -> list[l return [hashes[index:index + safe_size] for index in range(0, len(hashes), safe_size)] -def enqueue_move_bulk_parts(profile: dict, data: dict) -> list[dict]: - # Note: Keeps the existing move action intact for normal batches, while large moves are queued as bulk-1, bulk-2, etc. - base_payload = enrich_bulk_payload(profile, "move", data) +def enqueue_bulk_parts(profile: dict, action_name: str, data: dict) -> list[dict]: + # Note: Jedna wspolna funkcja dzieli duze operacje move/remove na male, uporzadkowane party bez ruszania pozostalych akcji. + base_payload = enrich_bulk_payload(profile, action_name, data) hashes = base_payload.get("hashes") or [] chunks = _chunk_hashes(hashes) if len(chunks) <= 1: - job_id = enqueue("move", profile["id"], base_payload) + job_id = enqueue(action_name, profile["id"], base_payload) return [{"job_id": job_id, "label": "bulk-1", "part": 1, "parts": 1, "hashes": hashes, "hash_count": len(hashes)}] jobs = [] @@ -336,11 +336,21 @@ def enqueue_move_bulk_parts(profile: dict, data: dict) -> list[dict]: "items": [items_by_hash[h] for h in chunk if h in items_by_hash], }) payload["job_context"] = context - job_id = enqueue("move", profile["id"], payload) + job_id = enqueue(action_name, profile["id"], payload) jobs.append({"job_id": job_id, "label": context["bulk_label"], "part": index, "parts": len(chunks), "hashes": chunk, "hash_count": len(chunk)}) return jobs +def enqueue_move_bulk_parts(profile: dict, data: dict) -> list[dict]: + # Note: Zachowuje stary publiczny helper dla move, ale korzysta z tej samej logiki partycji. + return enqueue_bulk_parts(profile, "move", data) + + +def enqueue_remove_bulk_parts(profile: dict, data: dict) -> list[dict]: + # Note: Remove/rm dostaje identyczne dzielenie na party jak move, co zmniejsza load na rTorrent. + return enqueue_bulk_parts(profile, "remove", data) + + @bp.get("/profiles") def profiles_list(): return ok({"profiles": preferences.list_profiles(), "active": preferences.active_profile()}) @@ -475,9 +485,9 @@ def torrent_action(action_name: str): allowed = {"start", "pause", "stop", "resume", "recheck", "reannounce", "remove", "move", "set_label", "set_ratio_group"} if action_name not in allowed: return jsonify({"ok": False, "error": "Unknown action"}), 400 - if action_name == "move": - # Note: Large move requests are split into ordered bulk parts; smaller requests keep the old single-job response shape. - jobs = enqueue_move_bulk_parts(profile, data) + if action_name in {"move", "remove"}: + # Note: Large move/remove requests are split into ordered bulk parts; smaller requests keep the old single-job response shape. + jobs = enqueue_bulk_parts(profile, action_name, data) first_job_id = jobs[0]["job_id"] if jobs else None total_hashes = sum(int(job.get("hash_count") or 0) for job in jobs) return ok({ diff --git a/pytorrent/services/rtorrent.py b/pytorrent/services/rtorrent.py index 8464fed..b2af777 100644 --- a/pytorrent/services/rtorrent.py +++ b/pytorrent/services/rtorrent.py @@ -1,5 +1,6 @@ from __future__ import annotations +import errno import os import posixpath import socket @@ -53,24 +54,57 @@ class ScgiRtorrentClient: } header_blob = b"".join(k.encode() + b"\0" + v.encode() + b"\0" for k, v in headers.items()) payload = str(len(header_blob)).encode("ascii") + b":" + header_blob + b"," + body - with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock: - sock.settimeout(self.timeout) - sock.sendall(payload) - chunks: list[bytes] = [] - while True: - chunk = sock.recv(65536) - if not chunk: - break - chunks.append(chunk) - response = b"".join(chunks) - if not response: - raise ConnectionError("Empty response from rTorrent SCGI") - if b"\r\n\r\n" in response: - response = response.split(b"\r\n\r\n", 1)[1] - elif b"\n\n" in response: - response = response.split(b"\n\n", 1)[1] - result, _ = loads(response) - return result[0] if len(result) == 1 else result + attempts = _scgi_retry_attempts() + last_exc = None + for attempt in range(1, attempts + 1): + try: + with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock: + sock.settimeout(self.timeout) + sock.sendall(payload) + chunks: list[bytes] = [] + while True: + chunk = sock.recv(65536) + if not chunk: + break + chunks.append(chunk) + response = b"".join(chunks) + if not response: + raise ConnectionError("Empty response from rTorrent SCGI") + if b"\r\n\r\n" in response: + response = response.split(b"\r\n\r\n", 1)[1] + elif b"\n\n" in response: + response = response.split(b"\n\n", 1)[1] + result, _ = loads(response) + return result[0] if len(result) == 1 else result + except Exception as exc: + last_exc = exc + if attempt >= attempts or not _is_transient_scgi_error(exc): + raise + time.sleep(_scgi_retry_delay(attempt)) + raise last_exc or ConnectionError("rTorrent SCGI call failed") + + +def _scgi_retry_attempts() -> int: + # Note: Krotki retry/backoff chroni masowe operacje przed chwilowym Errno 111 przy wysokim loadzie rTorrent. + try: + return max(1, min(10, int(os.environ.get("PYTORRENT_SCGI_RETRIES", "5")))) + except Exception: + return 5 + + +def _scgi_retry_delay(attempt: int) -> float: + return min(5.0, 0.35 * (2 ** max(0, attempt - 1))) + + +def _is_transient_scgi_error(exc: Exception) -> bool: + # Note: Retry obejmuje typowe chwilowe bledy SCGI/socket, ale nie ukrywa bledow merytorycznych XML-RPC. + if isinstance(exc, (ConnectionRefusedError, ConnectionResetError, TimeoutError, socket.timeout)): + return True + err_no = getattr(exc, "errno", None) + if err_no in {errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT, errno.EHOSTUNREACH, errno.ENETUNREACH}: + return True + msg = str(exc).lower() + return any(text in msg for text in ("connection refused", "connection reset", "timed out", "empty response")) def client_for(profile: dict) -> ScgiRtorrentClient: @@ -159,7 +193,7 @@ def _run_remote_move(c: ScgiRtorrentClient, src: str, dst: str, poll_interval: f try: output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-move-poll", status_path) or "").strip() except Exception as exc: - if _is_rt_timeout_error(exc): + if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc): continue raise if not output: @@ -207,6 +241,46 @@ def _safe_rm_rf_path(path: str) -> str: return path +def _run_remote_rm(c: ScgiRtorrentClient, path: str, poll_interval: float = 2.0) -> None: + # Note: rm -rf dziala w tle po stronie rTorrent, wiec dlugie kasowanie nie trzyma jednego polaczenia SCGI. + token = uuid.uuid4().hex + status_path = f"/tmp/pytorrent-rm-{token}.status" + script = ( + 'target=$1; status=$2; tmp=${status}.tmp; ' + 'rm -f "$status" "$tmp"; ' + '( rc=0; ' + 'if [ -z "$target" ] || [ "$target" = "/" ] || [ "$target" = "." ]; then echo "unsafe remove target: $target" >&2; rc=5; ' + 'else rm -rf -- "$target" || rc=$?; fi; ' + 'if [ $rc -eq 0 ]; then printf "OK\n" > "$status"; else printf "ERR %s\n" "$rc" > "$status"; fi; ' + 'if [ -s "$tmp" ]; then cat "$tmp" >> "$status"; fi; ' + 'rm -f "$tmp" ) > "$tmp" 2>&1 &' + ) + poll_script = 'status=$1; [ -f "$status" ] && cat "$status" || true' + cleanup_script = 'rm -f "$1"' + _rt_execute_allow_timeout(c, "execute.throw", "sh", "-c", script, "pytorrent-rm-start", path, status_path) + while True: + time.sleep(max(0.25, poll_interval)) + try: + output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-rm-poll", status_path) or "").strip() + except Exception as exc: + if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc): + continue + raise + if not output: + continue + try: + _rt_execute(c, "execute.throw", "sh", "-c", cleanup_script, "pytorrent-rm-clean", status_path) + except Exception: + pass + first_line = output.splitlines()[0].strip() + if first_line == "OK": + return + if first_line.startswith("ERR"): + details = "\n".join(output.splitlines()[1:]).strip() + raise RuntimeError(details or first_line) + raise RuntimeError(output) + + def _remove_torrent_data(c: ScgiRtorrentClient, torrent_hash: str) -> dict: data_path = _safe_rm_rf_path(_torrent_data_path(c, torrent_hash)) try: @@ -217,7 +291,7 @@ def _remove_torrent_data(c: ScgiRtorrentClient, torrent_hash: str) -> dict: c.call("d.close", torrent_hash) except Exception: pass - _rt_execute(c, "execute.throw", "rm", "-rf", data_path) + _run_remote_rm(c, data_path) return {"hash": torrent_hash, "removed_path": data_path} diff --git a/pytorrent/services/workers.py b/pytorrent/services/workers.py index 560da78..5a0a3bc 100644 --- a/pytorrent/services/workers.py +++ b/pytorrent/services/workers.py @@ -236,9 +236,9 @@ def list_jobs(limit: int = 200, offset: int = 0): def cancel_job(job_id: str) -> bool: row = _job_row(job_id) - if not row or row["status"] in {"done", "cancelled"}: + if not row or row["status"] not in {"pending", "running"}: return False - # Awaryjne anulowanie: pending, running i failed można oznaczyć jako cancelled z poziomu użytkownika. + # Note: Emergency cancel ma sens tylko dla niedokonczonych zadan; failed/done zostaja tylko do retry albo czyszczenia logow. _set_job(job_id, "cancelled", finished=True) _emit("job_update", {"id": job_id, "status": "cancelled"}) return True @@ -254,7 +254,7 @@ def emergency_clear_jobs() -> int: # Awaryjne czyszczenie: najpierw zamyka aktywne zadania jako cancelled, potem czyści całą listę job logów. now = utcnow() with connect() as conn: - conn.execute("UPDATE jobs SET status='cancelled', error='Emergency cancelled by user', finished_at=COALESCE(finished_at, ?), updated_at=? WHERE status IN ('pending', 'running', 'failed')", (now, now)) + conn.execute("UPDATE jobs SET status='cancelled', error='Emergency cancelled by user', finished_at=COALESCE(finished_at, ?), updated_at=? WHERE status IN ('pending', 'running')", (now, now)) cur = conn.execute("DELETE FROM jobs") deleted = int(cur.rowcount or 0) _emit("job_update", {"status": "cleared", "emergency": True}) diff --git a/pytorrent/static/app.js b/pytorrent/static/app.js index 51ae53e..1fcb77c 100644 --- a/pytorrent/static/app.js +++ b/pytorrent/static/app.js @@ -217,7 +217,7 @@ function setSelectionRange(hash, keepExisting=false){ const current=visibleRows.findIndex(t=>t.hash===hash); const last=visibleRows.findIndex(t=>t.hash===lastSelectedHash); if(current<0 || last<0){ selected.add(hash); lastSelectedHash=hash; return; } if(!keepExisting) selected.clear(); const a=Math.min(current,last), b=Math.max(current,last); visibleRows.slice(a,b+1).forEach(t=>selected.add(t.hash)); selectedHash=hash; } async function post(url,data,method='POST'){ const res=await fetch(url,{method,headers:{'Content-Type':'application/json'},body:JSON.stringify(data||{})}); const json=await res.json(); if(!json.ok) throw new Error(json.error||'Operation failed'); return json; } - async function runAction(action, extra={}){ const hashes=selectedHashes(); if(!hashes.length) return toast('No torrents selected','warning'); let payload={hashes,...extra}; if(action==='move'){ openPathPicker('move'); return; } setBusy(true); try{ const j=await post(`/api/torrents/${action}`,payload); markQueuedJobs(j, hashes, action); if(action==='recheck'){ hashes.forEach(h=>{ const t=torrents.get(h); if(t) torrents.set(h,{...t,status:'Checking',hashing:1,message:'Force recheck queued'}); }); scheduleRender(true); } toast(`${action} queued`,'success'); if(action==='set_label') await loadLabels(); }catch(e){toast(e.message,'danger');} finally{setBusy(false);} } + async function runAction(action, extra={}){ const hashes=selectedHashes(); if(!hashes.length) return toast('No torrents selected','warning'); let payload={hashes,...extra}; if(action==='move'){ openPathPicker('move'); return; } setBusy(true); try{ const j=await post(`/api/torrents/${action}`,payload); markQueuedJobs(j, hashes, action); if(action==='recheck'){ hashes.forEach(h=>{ const t=torrents.get(h); if(t) torrents.set(h,{...t,status:'Checking',hashing:1,message:'Force recheck queued'}); }); scheduleRender(true); } const parts=Number(j.bulk_parts||1); toast(parts>1?`${action} queued in ${parts} bulk parts`:`${action} queued`,'success'); if(action==='set_label') await loadLabels(); }catch(e){toast(e.message,'danger');} finally{setBusy(false);} } function flag(iso){ const code=String(iso||'').toLowerCase(); return code?` ${esc(code.toUpperCase())}`:'-'; } function table(headers,rows){ return `${headers.map(h=>``).join('')}${rows.map(r=>`${r.map(c=>``).join('')}`).join('')}
${esc(h)}
${c}
`; } function renderGeneral(){ const t=torrents.get(selectedHash); const labels=t?labelNames(t.label).map(l=>` ${esc(l)}`).join(' '):''; $('detailPane').innerHTML=t?`
Name${esc(t.name)}
Hash${esc(t.hash)}
Path${esc(t.path)}
Size${esc(t.size_h)}
Progress${esc(t.progress)}%
Ratio${esc(t.ratio)}
Downloaded${esc(t.down_total_h)}
Uploaded${esc(t.up_total_h)}
Labels${labels||'-'}
Ratio group${esc(t.ratio_group||'')}
`:'Select a torrent.'; } @@ -317,8 +317,10 @@ $('saveColumnsBtn')?.addEventListener('click',async()=>{ document.querySelectorAll('.column-toggle').forEach(cb=>cb.checked?hiddenColumns.delete(cb.dataset.colKey):hiddenColumns.add(cb.dataset.colKey)); applyColumnVisibility(); scheduleRender(true); await post('/api/preferences',{table_columns_json:JSON.stringify({hidden:[...hiddenColumns]})}).catch(e=>toast(e.message,'danger')); toast('Columns saved','success'); }); $('resetColumnsBtn')?.addEventListener('click',async()=>{ hiddenColumns.clear(); renderColumnManager(); applyColumnVisibility(); scheduleRender(true); await post('/api/preferences',{table_columns_json:JSON.stringify({hidden:[]})}).catch(()=>{}); }); - async function loadJobs(page=jobsPage){ const box=$('jobsTable'); if(!box)return; jobsPage=Math.max(0,page|0); box.innerHTML=' Loading jobs...'; const offset=jobsPage*jobsLimit; const j=await (await fetch(`/api/jobs?limit=${jobsLimit}&offset=${offset}`)).json(); const rows=j.jobs||[]; jobsTotal=Number(j.total||rows.length); const details=r=>{ const count=Number(r.hash_count||0); if(r.is_bulk || count>1) return `bulk
${esc(count)} torrent(s), details hidden`; const bits=[]; if(count) bits.push(`${esc(count)} torrent`); if(r.summary) bits.push(esc(r.summary)); return bits.join('
') || '-'; }; box.innerHTML=table(['Status','Action','Profile','Count','Details','Attempts','Started','Finished','Error','Actions'],rows.map(r=>[`${esc(r.status)}`,esc(r.action),esc(r.profile_id),esc(r.hash_count||0),details(r),esc(r.attempts||0),dateCell(r.started_at||r.created_at),dateCell(r.finished_at||r.updated_at),compactCell(r.error||'',140),` `])); renderJobsPager(); } + function jobActions(r){ const id=esc(r.id); const status=String(r.status||''); const actions=[]; if(status==='failed'||status==='cancelled') actions.push(``); if(status==='pending'||status==='running') actions.push(``); return actions.join(' ') || '-'; } + async function loadJobs(page=jobsPage){ const box=$('jobsTable'); if(!box)return; jobsPage=Math.max(0,page|0); box.innerHTML=' Loading jobs...'; const offset=jobsPage*jobsLimit; const j=await (await fetch(`/api/jobs?limit=${jobsLimit}&offset=${offset}`)).json(); const rows=j.jobs||[]; jobsTotal=Number(j.total||rows.length); const details=r=>{ const count=Number(r.hash_count||0); if(r.is_bulk || count>1) return `bulk
${esc(count)} torrent(s), details hidden`; const bits=[]; if(count) bits.push(`${esc(count)} torrent`); if(r.summary) bits.push(esc(r.summary)); return bits.join('
') || '-'; }; box.innerHTML=table(['Status','Action','Profile','Count','Details','Attempts','Started','Finished','Error','Actions'],rows.map(r=>[`${esc(r.status)}`,esc(r.action),esc(r.profile_id),esc(r.hash_count||0),details(r),esc(r.attempts||0),dateCell(r.started_at||r.created_at),dateCell(r.finished_at||r.updated_at),compactCell(r.error||'',140),jobActions(r)])); renderJobsPager(); } function renderJobsPager(){ const p=$('jobsPager'); if(!p)return; const pages=Math.max(1,Math.ceil(jobsTotal/jobsLimit)); p.innerHTML=`
Page ${jobsPage+1} / ${pages} · ${jobsTotal} jobs
`; $('jobsPrev')?.addEventListener('click',()=>loadJobs(jobsPage-1)); $('jobsNext')?.addEventListener('click',()=>loadJobs(jobsPage+1)); } + // Note: Przyciski w job logu sa zalezne od statusu: failed ma retry, a emergency cancel tylko pending/running. $('jobsModal')?.addEventListener('show.bs.modal',loadJobs); $('refreshJobsBtn')?.addEventListener('click',loadJobs); $('jobsTable')?.addEventListener('click',async e=>{ const btn=e.target.closest('.job-retry,.job-cancel'); if(!btn)return; const id=btn.dataset.id; if(!id)return; if(btn.classList.contains('job-retry')) await post(`/api/jobs/${id}/retry`,{}).catch(x=>toast(x.message,'danger')); if(btn.classList.contains('job-cancel')){ const st=btn.dataset.status||''; if((st==='pending'||st==='running') && !confirm('Emergency cancel this unfinished job?')) return; await post(`/api/jobs/${id}/cancel`,{}).catch(x=>toast(x.message,'danger')); } loadJobs(); }); $('clearJobsBtn')?.addEventListener('click',async()=>{ const emergency=confirm('Emergency clear all job logs, including unfinished jobs? OK = emergency clear, Cancel = clear only finished logs.'); if(!emergency && !confirm('Clear finished job logs? Pending and running jobs will stay.')) return; try{ const j=await post(`/api/jobs/clear${emergency?'?force=1':''}`,{}); toast(`${emergency?'Emergency cleared':'Cleared'} ${j.deleted||0} job log(s)`,'success'); jobsPage=0; loadJobs(0); }catch(e){ toast(e.message,'danger'); } });