270 lines
11 KiB
Python
270 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from . import rtorrent
|
|
from .preferences import get_profile
|
|
from ..config import WORKERS
|
|
from ..db import connect, utcnow, default_user_id
|
|
|
|
_executor = ThreadPoolExecutor(max_workers=WORKERS, thread_name_prefix="pytorrent-job")
|
|
_socketio = None
|
|
_semaphores: dict[int, threading.Semaphore] = {}
|
|
_exclusive_locks: dict[int, threading.Lock] = {}
|
|
_sem_lock = threading.Lock()
|
|
|
|
|
|
def set_socketio(socketio):
|
|
global _socketio
|
|
_socketio = socketio
|
|
|
|
|
|
def _emit(name: str, payload: dict):
|
|
if _socketio:
|
|
_socketio.emit(name, payload)
|
|
|
|
|
|
def _get_sem(profile: dict) -> threading.Semaphore:
|
|
profile_id = int(profile["id"])
|
|
max_parallel = max(1, int(profile.get("max_parallel_jobs") or 3))
|
|
with _sem_lock:
|
|
if profile_id not in _semaphores:
|
|
_semaphores[profile_id] = threading.Semaphore(max_parallel)
|
|
return _semaphores[profile_id]
|
|
|
|
|
|
def _get_exclusive_lock(profile_id: int) -> threading.Lock:
|
|
with _sem_lock:
|
|
if profile_id not in _exclusive_locks:
|
|
_exclusive_locks[profile_id] = threading.Lock()
|
|
return _exclusive_locks[profile_id]
|
|
|
|
|
|
def _job_row(job_id: str):
|
|
with connect() as conn:
|
|
return conn.execute("SELECT rowid AS _rowid, * FROM jobs WHERE id=?", (job_id,)).fetchone()
|
|
|
|
|
|
def _is_ordered_action(action_name: str) -> bool:
|
|
return action_name in {"move", "remove"}
|
|
|
|
|
|
def _has_prior_ordered_jobs(profile_id: int, rowid: int) -> bool:
|
|
with connect() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT 1
|
|
FROM jobs
|
|
WHERE profile_id=?
|
|
AND rowid<?
|
|
AND action IN ('move', 'remove')
|
|
AND status IN ('pending', 'running')
|
|
LIMIT 1
|
|
""",
|
|
(profile_id, rowid),
|
|
).fetchone()
|
|
return bool(row)
|
|
|
|
|
|
def _wait_for_prior_ordered_jobs(job_id: str, profile_id: int, rowid: int) -> bool:
|
|
while _has_prior_ordered_jobs(profile_id, rowid):
|
|
fresh = _job_row(job_id)
|
|
if not fresh or fresh["status"] == "cancelled":
|
|
return False
|
|
time.sleep(0.5)
|
|
return True
|
|
|
|
|
|
def _set_job(job_id: str, status: str, error: str = "", result: dict | None = None, started: bool = False, finished: bool = False):
|
|
now = utcnow()
|
|
fields = ["status=?", "error=?", "updated_at=?"]
|
|
values: list = [status, error, now]
|
|
if result is not None:
|
|
fields.append("result_json=?")
|
|
values.append(json.dumps(result))
|
|
if started:
|
|
fields.append("started_at=?")
|
|
values.append(now)
|
|
if finished:
|
|
fields.append("finished_at=?")
|
|
values.append(now)
|
|
values.append(job_id)
|
|
with connect() as conn:
|
|
conn.execute(f"UPDATE jobs SET {', '.join(fields)} WHERE id=?", values)
|
|
|
|
|
|
def enqueue(action_name: str, profile_id: int, payload: dict, user_id: int | None = None, max_attempts: int = 2) -> str:
|
|
user_id = user_id or default_user_id()
|
|
job_id = uuid.uuid4().hex
|
|
now = utcnow()
|
|
with connect() as conn:
|
|
conn.execute(
|
|
"INSERT INTO jobs(id,user_id,profile_id,action,payload_json,status,attempts,max_attempts,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(job_id, user_id, profile_id, action_name, json.dumps(payload), "pending", 0, max_attempts, now, now),
|
|
)
|
|
_emit("job_update", {"id": job_id, "action": action_name, "profile_id": profile_id, "status": "pending"})
|
|
_executor.submit(_run, job_id)
|
|
return job_id
|
|
|
|
|
|
def _execute(profile: dict, action_name: str, payload: dict):
|
|
if action_name == "add_magnet":
|
|
return rtorrent.add_magnet(profile, payload["uri"], bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or ""))
|
|
if action_name == "add_torrent_raw":
|
|
import base64
|
|
raw = base64.b64decode(payload["data_b64"])
|
|
return rtorrent.add_torrent_raw(profile, raw, bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or ""))
|
|
if action_name == "set_limits":
|
|
return rtorrent.set_limits(profile, payload.get("down"), payload.get("up"))
|
|
hashes = payload.get("hashes") or []
|
|
return rtorrent.action(profile, hashes, action_name, payload)
|
|
|
|
|
|
def _run(job_id: str):
|
|
job = _job_row(job_id)
|
|
if not job or job["status"] == "cancelled":
|
|
return
|
|
profile = get_profile(int(job["profile_id"]), int(job["user_id"]))
|
|
if not profile:
|
|
_set_job(job_id, "failed", "rTorrent profile does not exist", finished=True)
|
|
_emit("job_update", {"id": job_id, "status": "failed", "error": "profile not found"})
|
|
return
|
|
profile_id = int(profile["id"])
|
|
ordered_lock = None
|
|
if _is_ordered_action(str(job["action"])):
|
|
if not _wait_for_prior_ordered_jobs(job_id, profile_id, int(job["_rowid"])):
|
|
return
|
|
ordered_lock = _get_exclusive_lock(profile_id)
|
|
ordered_lock.acquire()
|
|
sem = _get_sem(profile)
|
|
sem.acquire()
|
|
try:
|
|
job = _job_row(job_id)
|
|
if not job or job["status"] == "cancelled":
|
|
return
|
|
payload = json.loads(job.get("payload_json") or "{}")
|
|
attempts = int(job.get("attempts") or 0) + 1
|
|
with connect() as conn:
|
|
conn.execute("UPDATE jobs SET status='running', attempts=?, started_at=COALESCE(started_at, ?), updated_at=? WHERE id=?", (attempts, utcnow(), utcnow(), job_id))
|
|
_emit("operation_started", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1})
|
|
_emit("job_update", {"id": job_id, "status": "running", "attempts": attempts})
|
|
result = _execute(profile, job["action"], payload)
|
|
fresh = _job_row(job_id)
|
|
# Awaryjne anulowanie: jeżeli użytkownik anuluje zadanie w trakcie pracy, wynik nie nadpisuje statusu cancelled.
|
|
if fresh and fresh["status"] == "cancelled":
|
|
return
|
|
_set_job(job_id, "done", result=result, finished=True)
|
|
_emit("operation_finished", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1, "result": result})
|
|
_emit("job_update", {"id": job_id, "status": "done", "result": result})
|
|
except Exception as exc:
|
|
fresh = _job_row(job_id) or {}
|
|
attempts = int(fresh.get("attempts") or 1)
|
|
max_attempts = int(fresh.get("max_attempts") or 2)
|
|
# Awaryjne anulowanie: wyjątek z anulowanego zadania nie przywraca go do retry ani failed.
|
|
if fresh and fresh.get("status") == "cancelled":
|
|
return
|
|
status = "pending" if attempts < max_attempts else "failed"
|
|
_set_job(job_id, status, str(exc), finished=(status == "failed"))
|
|
_emit("operation_failed", {"job_id": job_id, "action": job.get("action"), "profile_id": job.get("profile_id"), "hashes": payload.get("hashes") or [], "error": str(exc)})
|
|
_emit("job_update", {"id": job_id, "status": status, "error": str(exc), "attempts": attempts})
|
|
if status == "pending":
|
|
_executor.submit(_run, job_id)
|
|
finally:
|
|
sem.release()
|
|
if ordered_lock:
|
|
ordered_lock.release()
|
|
|
|
|
|
def _safe_json(value, fallback):
|
|
try:
|
|
return json.loads(value or "")
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def _job_summary(row: dict, payload: dict, result: dict) -> str:
|
|
ctx = payload.get("job_context") or {}
|
|
count = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0)
|
|
parts = []
|
|
if count:
|
|
parts.append(("bulk " if count > 1 else "single ") + f"{count} torrent(s)")
|
|
if ctx.get("target_path"):
|
|
parts.append(f"target: {ctx.get('target_path')}")
|
|
if ctx.get("remove_data"):
|
|
parts.append("remove data")
|
|
if ctx.get("move_data"):
|
|
parts.append("move data")
|
|
if result.get("count") is not None:
|
|
parts.append(f"done: {result.get('count')}")
|
|
if result.get("errors"):
|
|
parts.append(f"errors: {len(result.get('errors') or [])}")
|
|
return "; ".join(parts)
|
|
|
|
|
|
def _public_job(row) -> dict:
|
|
d = dict(row)
|
|
payload = _safe_json(d.get("payload_json"), {})
|
|
result = _safe_json(d.get("result_json"), {})
|
|
ctx = payload.get("job_context") or {}
|
|
d["payload"] = payload
|
|
d["result"] = result
|
|
d["hash_count"] = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0)
|
|
d["is_bulk"] = bool(ctx.get("bulk") or d["hash_count"] > 1)
|
|
d["summary"] = _job_summary(d, payload, result)
|
|
items = ctx.get("items") or []
|
|
if d["is_bulk"]:
|
|
d["items_preview"] = ""
|
|
else:
|
|
d["items_preview"] = ", ".join([str((x or {}).get("name") or (x or {}).get("hash") or "") for x in items[:1] if x])
|
|
return d
|
|
|
|
|
|
def list_jobs(limit: int = 200, offset: int = 0):
|
|
limit = max(1, min(int(limit or 50), 500))
|
|
offset = max(0, int(offset or 0))
|
|
with connect() as conn:
|
|
rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC LIMIT ? OFFSET ?", (limit, offset)).fetchall()
|
|
total = conn.execute("SELECT COUNT(*) AS n FROM jobs").fetchone()["n"]
|
|
return {"rows": [_public_job(r) for r in rows], "total": total, "limit": limit, "offset": offset}
|
|
|
|
|
|
def cancel_job(job_id: str) -> bool:
|
|
row = _job_row(job_id)
|
|
if not row or row["status"] in {"done", "cancelled"}:
|
|
return False
|
|
# Awaryjne anulowanie: pending, running i failed można oznaczyć jako cancelled z poziomu użytkownika.
|
|
_set_job(job_id, "cancelled", finished=True)
|
|
_emit("job_update", {"id": job_id, "status": "cancelled"})
|
|
return True
|
|
|
|
|
|
def clear_jobs() -> int:
|
|
with connect() as conn:
|
|
cur = conn.execute("DELETE FROM jobs WHERE status NOT IN ('pending', 'running')")
|
|
return int(cur.rowcount or 0)
|
|
|
|
|
|
def emergency_clear_jobs() -> int:
|
|
# Awaryjne czyszczenie: najpierw zamyka aktywne zadania jako cancelled, potem czyści całą listę job logów.
|
|
now = utcnow()
|
|
with connect() as conn:
|
|
conn.execute("UPDATE jobs SET status='cancelled', error='Emergency cancelled by user', finished_at=COALESCE(finished_at, ?), updated_at=? WHERE status IN ('pending', 'running', 'failed')", (now, now))
|
|
cur = conn.execute("DELETE FROM jobs")
|
|
deleted = int(cur.rowcount or 0)
|
|
_emit("job_update", {"status": "cleared", "emergency": True})
|
|
return deleted
|
|
|
|
|
|
def retry_job(job_id: str) -> bool:
|
|
row = _job_row(job_id)
|
|
if not row or row["status"] not in {"failed", "cancelled"}:
|
|
return False
|
|
with connect() as conn:
|
|
conn.execute("UPDATE jobs SET status='pending', error='', finished_at=NULL, updated_at=? WHERE id=?", (utcnow(), job_id))
|
|
_emit("job_update", {"id": job_id, "status": "pending"})
|
|
_executor.submit(_run, job_id)
|
|
return True
|