from __future__ import annotations import json import threading import time import uuid from concurrent.futures import ThreadPoolExecutor from . import rtorrent from .preferences import get_profile from ..config import WORKERS from ..db import connect, utcnow, default_user_id _executor = ThreadPoolExecutor(max_workers=WORKERS, thread_name_prefix="pytorrent-job") _socketio = None _semaphores: dict[int, threading.Semaphore] = {} _exclusive_locks: dict[int, threading.Lock] = {} _sem_lock = threading.Lock() def set_socketio(socketio): global _socketio _socketio = socketio def _emit(name: str, payload: dict): if _socketio: _socketio.emit(name, payload) def _get_sem(profile: dict) -> threading.Semaphore: profile_id = int(profile["id"]) max_parallel = max(1, int(profile.get("max_parallel_jobs") or 3)) with _sem_lock: if profile_id not in _semaphores: _semaphores[profile_id] = threading.Semaphore(max_parallel) return _semaphores[profile_id] def _get_exclusive_lock(profile_id: int) -> threading.Lock: with _sem_lock: if profile_id not in _exclusive_locks: _exclusive_locks[profile_id] = threading.Lock() return _exclusive_locks[profile_id] def _job_row(job_id: str): with connect() as conn: return conn.execute("SELECT rowid AS _rowid, * FROM jobs WHERE id=?", (job_id,)).fetchone() def _is_ordered_action(action_name: str) -> bool: return action_name in {"move", "remove"} def _has_prior_ordered_jobs(profile_id: int, rowid: int) -> bool: with connect() as conn: row = conn.execute( """ SELECT 1 FROM jobs WHERE profile_id=? AND rowid bool: while _has_prior_ordered_jobs(profile_id, rowid): fresh = _job_row(job_id) if not fresh or fresh["status"] == "cancelled": return False time.sleep(0.5) return True def _set_job(job_id: str, status: str, error: str = "", result: dict | None = None, started: bool = False, finished: bool = False): now = utcnow() fields = ["status=?", "error=?", "updated_at=?"] values: list = [status, error, now] if result is not None: fields.append("result_json=?") values.append(json.dumps(result)) if started: fields.append("started_at=?") values.append(now) if finished: fields.append("finished_at=?") values.append(now) values.append(job_id) with connect() as conn: conn.execute(f"UPDATE jobs SET {', '.join(fields)} WHERE id=?", values) def enqueue(action_name: str, profile_id: int, payload: dict, user_id: int | None = None, max_attempts: int = 2) -> str: user_id = user_id or default_user_id() job_id = uuid.uuid4().hex now = utcnow() with connect() as conn: conn.execute( "INSERT INTO jobs(id,user_id,profile_id,action,payload_json,status,attempts,max_attempts,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?)", (job_id, user_id, profile_id, action_name, json.dumps(payload), "pending", 0, max_attempts, now, now), ) _emit("job_update", {"id": job_id, "action": action_name, "profile_id": profile_id, "status": "pending"}) _executor.submit(_run, job_id) return job_id def _execute(profile: dict, action_name: str, payload: dict): if action_name == "add_magnet": return rtorrent.add_magnet(profile, payload["uri"], bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or "")) if action_name == "add_torrent_raw": import base64 raw = base64.b64decode(payload["data_b64"]) return rtorrent.add_torrent_raw(profile, raw, bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or "")) if action_name == "set_limits": return rtorrent.set_limits(profile, payload.get("down"), payload.get("up")) hashes = payload.get("hashes") or [] return rtorrent.action(profile, hashes, action_name, payload) def _run(job_id: str): job = _job_row(job_id) if not job or job["status"] == "cancelled": return profile = get_profile(int(job["profile_id"]), int(job["user_id"])) if not profile: _set_job(job_id, "failed", "rTorrent profile does not exist", finished=True) _emit("job_update", {"id": job_id, "status": "failed", "error": "profile not found"}) return profile_id = int(profile["id"]) ordered_lock = None if _is_ordered_action(str(job["action"])): if not _wait_for_prior_ordered_jobs(job_id, profile_id, int(job["_rowid"])): return ordered_lock = _get_exclusive_lock(profile_id) ordered_lock.acquire() sem = _get_sem(profile) sem.acquire() try: job = _job_row(job_id) if not job or job["status"] == "cancelled": return payload = json.loads(job.get("payload_json") or "{}") attempts = int(job.get("attempts") or 0) + 1 with connect() as conn: conn.execute("UPDATE jobs SET status='running', attempts=?, started_at=COALESCE(started_at, ?), updated_at=? WHERE id=?", (attempts, utcnow(), utcnow(), job_id)) _emit("operation_started", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1}) _emit("job_update", {"id": job_id, "status": "running", "attempts": attempts}) result = _execute(profile, job["action"], payload) _set_job(job_id, "done", result=result, finished=True) _emit("operation_finished", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1, "result": result}) _emit("job_update", {"id": job_id, "status": "done", "result": result}) except Exception as exc: fresh = _job_row(job_id) or {} attempts = int(fresh.get("attempts") or 1) max_attempts = int(fresh.get("max_attempts") or 2) status = "pending" if attempts < max_attempts else "failed" _set_job(job_id, status, str(exc), finished=(status == "failed")) _emit("operation_failed", {"job_id": job_id, "action": job.get("action"), "profile_id": job.get("profile_id"), "hashes": payload.get("hashes") or [], "error": str(exc)}) _emit("job_update", {"id": job_id, "status": status, "error": str(exc), "attempts": attempts}) if status == "pending": _executor.submit(_run, job_id) finally: sem.release() if ordered_lock: ordered_lock.release() def _safe_json(value, fallback): try: return json.loads(value or "") except Exception: return fallback def _job_summary(row: dict, payload: dict, result: dict) -> str: ctx = payload.get("job_context") or {} count = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0) parts = [] if count: parts.append(("bulk " if count > 1 else "single ") + f"{count} torrent(s)") if ctx.get("target_path"): parts.append(f"target: {ctx.get('target_path')}") if ctx.get("remove_data"): parts.append("remove data") if ctx.get("move_data"): parts.append("move data") if result.get("count") is not None: parts.append(f"done: {result.get('count')}") if result.get("errors"): parts.append(f"errors: {len(result.get('errors') or [])}") return "; ".join(parts) def _public_job(row) -> dict: d = dict(row) payload = _safe_json(d.get("payload_json"), {}) result = _safe_json(d.get("result_json"), {}) ctx = payload.get("job_context") or {} d["payload"] = payload d["result"] = result d["hash_count"] = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0) d["is_bulk"] = bool(ctx.get("bulk") or d["hash_count"] > 1) d["summary"] = _job_summary(d, payload, result) items = ctx.get("items") or [] if d["is_bulk"]: d["items_preview"] = "" else: d["items_preview"] = ", ".join([str((x or {}).get("name") or (x or {}).get("hash") or "") for x in items[:1] if x]) return d def list_jobs(limit: int = 200, offset: int = 0): limit = max(1, min(int(limit or 50), 500)) offset = max(0, int(offset or 0)) with connect() as conn: rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC LIMIT ? OFFSET ?", (limit, offset)).fetchall() total = conn.execute("SELECT COUNT(*) AS n FROM jobs").fetchone()["n"] return {"rows": [_public_job(r) for r in rows], "total": total, "limit": limit, "offset": offset} def cancel_job(job_id: str) -> bool: row = _job_row(job_id) if not row or row["status"] not in {"pending", "failed"}: return False _set_job(job_id, "cancelled", finished=True) _emit("job_update", {"id": job_id, "status": "cancelled"}) return True def clear_jobs() -> int: with connect() as conn: cur = conn.execute("DELETE FROM jobs WHERE status NOT IN ('pending', 'running')") return int(cur.rowcount or 0) def retry_job(job_id: str) -> bool: row = _job_row(job_id) if not row or row["status"] not in {"failed", "cancelled"}: return False with connect() as conn: conn.execute("UPDATE jobs SET status='pending', error='', finished_at=NULL, updated_at=? WHERE id=?", (utcnow(), job_id)) _emit("job_update", {"id": job_id, "status": "pending"}) _executor.submit(_run, job_id) return True