diff --git a/pytorrent/services/download_planner.py b/pytorrent/services/download_planner.py index aa08c20..c28d346 100644 --- a/pytorrent/services/download_planner.py +++ b/pytorrent/services/download_planner.py @@ -1,5 +1,6 @@ from __future__ import annotations import json +import threading import time import psutil from datetime import datetime, timezone @@ -46,6 +47,29 @@ _LAST_RUN: dict[int, float] = {} _LAST_LIMITS: dict[int, tuple[int, int]] = {} _HIGH_CPU_SINCE: dict[int, float] = {} _PLANNER_CONNECTION_STATUS: dict[int, str] = {} +_SCHEDULER_STARTED = False +_SCHEDULER_LOCK = threading.Lock() +_PROFILE_LOCKS: dict[int, threading.Lock] = {} +_PROFILE_LOCKS_GUARD = threading.Lock() + + +def _profile_lock(profile_id: int) -> threading.Lock: + """Keep one planner run per profile active at a time.""" + with _PROFILE_LOCKS_GUARD: + if profile_id not in _PROFILE_LOCKS: + _PROFILE_LOCKS[profile_id] = threading.Lock() + return _PROFILE_LOCKS[profile_id] + + +def _all_profiles() -> list[dict]: + """Read every configured profile directly from DB for browser-independent background work.""" + with connect() as conn: + return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()] + + +def _owner_user_id(profile: dict) -> int: + """Use the profile owner for background planner checks.""" + return int(profile.get("user_id") or default_user_id()) def _rtorrent_ready(profile: dict) -> tuple[bool, str]: @@ -580,26 +604,42 @@ def preview(profile: dict, user_id: int | None = None) -> dict: def start_scheduler(socketio=None) -> None: + """Start the browser-independent planner loop for every configured profile.""" + global _SCHEDULER_STARTED + with _SCHEDULER_LOCK: + if _SCHEDULER_STARTED: + return + _SCHEDULER_STARTED = True + def loop(): while True: try: from .websocket import emit_profile_event - profiles: list[dict] - with connect() as conn: - profiles = [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()] - for profile in profiles: + for profile in _all_profiles(): + profile_id = int(profile.get("id") or 0) + if not profile_id: + continue + lock = _profile_lock(profile_id) + if not lock.acquire(blocking=False): + continue try: - result = enforce(profile, force=False) + # Note: Background planner runs per configured profile with the profile owner, not only for the active UI profile. + result = enforce(profile, force=False, user_id=_owner_user_id(profile)) if socketio and result.get("enabled") and not result.get("skipped"): - emit_profile_event(socketio, "download_plan_update", result, int(profile["id"])) + emit_profile_event(socketio, "download_plan_update", result, profile_id) except Exception as exc: if socketio: - emit_profile_event(socketio, "download_plan_update", {"ok": False, "profile_id": int(profile.get("id") or 0), "error": str(exc)}, int(profile.get("id") or 0)) + emit_profile_event(socketio, "download_plan_update", {"ok": False, "profile_id": profile_id, "error": str(exc)}, profile_id) + finally: + lock.release() except Exception: pass if socketio: socketio.sleep(30) else: time.sleep(30) + if socketio: socketio.start_background_task(loop) + else: + threading.Thread(target=loop, daemon=True, name="pytorrent-download-planner-scheduler").start() diff --git a/pytorrent/services/ratio_rules.py b/pytorrent/services/ratio_rules.py index 18fa48f..171722b 100644 --- a/pytorrent/services/ratio_rules.py +++ b/pytorrent/services/ratio_rules.py @@ -137,10 +137,12 @@ def start_scheduler(socketio=None) -> None: profile_id = int(row["profile_id"]) with connect() as conn: owner = conn.execute("SELECT user_id FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone() - profile = get_profile(profile_id, int(owner["user_id"] if owner and owner.get("user_id") else default_user_id())) + owner_id = int(owner["user_id"] if owner and owner.get("user_id") else default_user_id()) + profile = get_profile(profile_id, owner_id) if not profile: continue - result = check(profile) + # Note: Ratio rules are evaluated per profile owner, not the active browser user. + result = check(profile, user_id=owner_id) if socketio and result.get("applied"): socketio.emit("ratio_rules_checked", {"profile_id": profile["id"], **result}, to=f"profile:{profile['id']}") except Exception: diff --git a/pytorrent/services/rss.py b/pytorrent/services/rss.py index 9bb01b5..1603460 100644 --- a/pytorrent/services/rss.py +++ b/pytorrent/services/rss.py @@ -203,9 +203,11 @@ def start_scheduler(socketio=None) -> None: profile_id = int(row["profile_id"]) with connect() as conn: owner = conn.execute("SELECT user_id FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone() - profile = get_profile(profile_id, int(owner["user_id"] if owner and owner.get("user_id") else default_user_id())) + owner_id = int(owner["user_id"] if owner and owner.get("user_id") else default_user_id()) + profile = get_profile(profile_id, owner_id) if profile: - result = check(profile, only_due=True) + # Note: RSS jobs run with the profile owner in background mode, independent of browser activity. + result = check(profile, user_id=owner_id, only_due=True) if socketio and result.get("queued"): socketio.emit("rss_checked", {"profile_id": profile["id"], **result}, to=f"profile:{profile['id']}") except Exception: diff --git a/pytorrent/services/startup_config.py b/pytorrent/services/startup_config.py index 3153ea9..12fc121 100644 --- a/pytorrent/services/startup_config.py +++ b/pytorrent/services/startup_config.py @@ -74,9 +74,12 @@ def schedule_startup_config_apply(socketio, delay_seconds: int = 60, retry_secon socketio.sleep(max(0, int(delay_seconds))) started_at = monotonic() while True: + failed_profile_id = 0 try: profiles = _profiles() for profile in profiles: + failed_profile_id = int(profile.get("id") or 0) + # Note: Startup config applies per profile after connectivity is detected; it does not depend on the active UI profile. _apply_profile(socketio, profile) pending = [int(profile.get("id") or 0) for profile in profiles if int(profile.get("id") or 0) not in _applied_profiles] if not pending or monotonic() - started_at >= max(0, int(max_wait_seconds)): @@ -87,7 +90,7 @@ def schedule_startup_config_apply(socketio, delay_seconds: int = 60, retry_secon return except Exception as exc: operation_logs.record( - None, + failed_profile_id or None, "rtorrent_config_startup", f"rTorrent startup config scheduler failed: {exc}", severity="warning", @@ -95,7 +98,7 @@ def schedule_startup_config_apply(socketio, delay_seconds: int = 60, retry_secon action="rtorrent_config", details={"error": str(exc)}, ) - socketio.emit("rtorrent_config_applied", {"ok": False, "profile_id": int(profile_id or 0), "error": str(exc)}, to=f"profile:{int(profile_id or 0)}" if profile_id else None) + socketio.emit("rtorrent_config_applied", {"ok": False, "profile_id": int(failed_profile_id or 0), "error": str(exc)}, to=f"profile:{int(failed_profile_id)}" if failed_profile_id else None) socketio.sleep(max(5, int(retry_seconds))) socketio.start_background_task(runner) diff --git a/pytorrent/services/websocket.py b/pytorrent/services/websocket.py index b08e81a..3700818 100644 --- a/pytorrent/services/websocket.py +++ b/pytorrent/services/websocket.py @@ -16,11 +16,9 @@ def _profile_room(profile_id: int) -> str: def _poller_profiles() -> list[dict]: - if not auth.enabled(): - profile = active_profile() - return [profile] if profile else [] from ..db import connect with connect() as conn: + # Note: Background polling must be profile-scoped and browser-independent, even when auth is disabled. return conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall() @@ -33,11 +31,20 @@ def _emit_profile(socketio, event: str, payload: dict, profile_id: int) -> None: emit_profile_event(socketio, event, payload, profile_id) -def _apply_configured_speed_limits(profile: dict) -> None: - limits = profile_speed_limits.get_limits(int(profile.get("id") or 0)) +_speed_limits_applied: dict[int, tuple[int, int]] = {} + + +def _apply_configured_speed_limits(profile: dict, *, force: bool = False) -> None: + profile_id = int(profile.get("id") or 0) + limits = profile_speed_limits.get_limits(profile_id) if not limits.get("configured"): return + key = (int(limits.get("down") or 0), int(limits.get("up") or 0)) + if not force and _speed_limits_applied.get(profile_id) == key: + return + # Note: Persisted per-profile limits are applied by the backend poller, not only after browser profile selection. rtorrent.set_limits(profile, limits.get("down"), limits.get("up")) + _speed_limits_applied[profile_id] = key def _run_slow_profile_tasks(socketio, profile: dict, profile_id: int) -> None: @@ -60,7 +67,7 @@ def _run_slow_profile_tasks(socketio, profile: dict, profile_id: int) -> None: except Exception as exc: _emit_profile(socketio, "smart_queue_update", {"ok": False, "profile_id": profile_id, "error": str(exc)}, profile_id) try: - auto_result = automation_rules.check(profile, force=False) + auto_result = automation_rules.check(profile, user_id=profile_user_id, force=False) if auto_result.get("applied") or auto_result.get("batches"): _emit_profile(socketio, "automation_update", auto_result, profile_id) except Exception as exc: @@ -145,6 +152,8 @@ def register_socketio_handlers(socketio): heartbeat = {"ok": True, "profile_id": pid, "tick": state.tick_count + 1, "error": ""} try: + # Note: This keeps per-profile runtime limits active after app start, without waiting for UI contact. + _apply_configured_speed_limits(profile) rows = torrent_cache.snapshot(pid) speed_status = _speed_status_from_rows(pid, rows) @@ -280,7 +289,7 @@ def register_socketio_handlers(socketio): emit("profile_required", {"ok": True, "profiles": []}) return try: - _apply_configured_speed_limits(profile) + _apply_configured_speed_limits(profile, force=True) except Exception as exc: emit("rtorrent_error", {"profile_id": profile["id"], "error": str(exc)}) rows = torrent_cache.snapshot(profile["id"]) @@ -306,7 +315,7 @@ def register_socketio_handlers(socketio): return join_room(_profile_room(profile_id)) try: - _apply_configured_speed_limits(profile) + _apply_configured_speed_limits(profile, force=True) except Exception as exc: emit("rtorrent_error", {"profile_id": profile_id, "error": str(exc)}) diff = torrent_cache.refresh(profile) diff --git a/pytorrent/services/workers.py b/pytorrent/services/workers.py index 371ef69..6619613 100644 --- a/pytorrent/services/workers.py +++ b/pytorrent/services/workers.py @@ -112,24 +112,55 @@ def _is_light_job(row) -> bool: return _is_light_action(str((row or {}).get("action") or "")) -def _has_prior_ordered_jobs(profile_id: int, rowid: int) -> bool: +def _ordered_profile_ids(row) -> set[int]: + """Return every profile touched by an ordered job.""" + # Note: Profile-transfer jobs touch both source and target profiles, so they must be ordered across both sides. + ids: set[int] = set() + try: + profile_id = int((row or {}).get("profile_id") or 0) + if profile_id: + ids.add(profile_id) + except Exception: + pass + try: + payload = _job_payload(row) + target_id = int(payload.get("target_profile_id") or 0) + if str((row or {}).get("action") or "") == "profile_transfer" and target_id: + ids.add(target_id) + except Exception: + pass + return ids + + +def _ordered_locks_for(row) -> list[threading.Lock]: + """Acquire locks in stable order to avoid deadlocks between cross-profile jobs.""" + return [_get_exclusive_lock(profile_id) for profile_id in sorted(_ordered_profile_ids(row))] + + +def _has_prior_ordered_jobs(profile_ids: set[int], rowid: int) -> bool: + if not profile_ids: + return False with connect() as conn: rows = conn.execute( """ - SELECT rowid AS _rowid, action, payload_json + SELECT rowid AS _rowid, profile_id, action, payload_json FROM jobs - WHERE profile_id=? - AND rowid bool: - while _has_prior_ordered_jobs(profile_id, rowid): +def _wait_for_prior_ordered_jobs(job_id: str, profile_ids: set[int], rowid: int) -> bool: + while _has_prior_ordered_jobs(profile_ids, rowid): fresh = _job_row(job_id) if not fresh or fresh["status"] == "cancelled": return False @@ -379,7 +410,7 @@ def _run(job_id: str): if not _claim_runner(job_id): return sem = None - ordered_lock = None + ordered_locks: list[threading.Lock] = [] job = {} payload = {} try: @@ -394,10 +425,12 @@ def _run(job_id: str): return profile_id = int(profile["id"]) if _is_ordered_job(job) and not _is_priority_job(job): - if not _wait_for_prior_ordered_jobs(job_id, profile_id, int(job["_rowid"])): + involved_profile_ids = _ordered_profile_ids(job) + if not _wait_for_prior_ordered_jobs(job_id, involved_profile_ids, int(job["_rowid"])): return - ordered_lock = _get_exclusive_lock(profile_id) - ordered_lock.acquire() + ordered_locks = _ordered_locks_for(job) + for lock in ordered_locks: + lock.acquire() sem = _get_sem(profile, light=_is_light_job(job)) sem.acquire() job = _job_row(job_id) @@ -454,8 +487,8 @@ def _run(job_id: str): finally: if sem: sem.release() - if ordered_lock: - ordered_lock.release() + for lock in reversed(ordered_locks): + lock.release() _release_runner(job_id) diff --git a/pytorrent/static/styles.css b/pytorrent/static/styles.css index 2893202..49a6f8f 100644 --- a/pytorrent/static/styles.css +++ b/pytorrent/static/styles.css @@ -514,8 +514,13 @@ body { padding-bottom: 0; padding-top: 0; text-align: center; + vertical-align: middle; width: 34px; } +.torrent-table thead .sel input[type="checkbox"] { + display: block; + margin: 0 auto; +} .torrent-table .torrent-select-cell { align-items: center; display: flex;