from __future__ import annotations import threading import psutil from flask_socketio import emit, join_room, leave_room, disconnect from ..config import POLL_INTERVAL from .preferences import active_profile, get_profile from .torrent_cache import torrent_cache from .torrent_summary import cached_summary from . import rtorrent, smart_queue, traffic_history, automation_rules, torrent_stats, auth def _profile_room(profile_id: int) -> str: return f"profile:{int(profile_id)}" def _poller_profiles() -> list[dict]: # Note: Background polling has no browser session, so auth-enabled mode refreshes all profiles and emits only to per-profile rooms. if not auth.enabled(): profile = active_profile() return [profile] if profile else [] from ..db import connect with connect() as conn: return conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall() def _emit_profile(socketio, event: str, payload: dict, profile_id: int) -> None: target = _profile_room(profile_id) if auth.enabled() else None socketio.emit(event, payload, to=target) if target else socketio.emit(event, payload) _started = False _start_lock = threading.Lock() def register_socketio_handlers(socketio): def poller(): tick = 0 while True: for profile in _poller_profiles(): if not profile: continue pid = int(profile["id"]) diff = torrent_cache.refresh(profile) heartbeat = {"ok": bool(diff.get("ok")), "profile_id": pid, "tick": tick, "error": diff.get("error", "")} if diff.get("ok") and (diff["added"] or diff["updated"] or diff["removed"]): _emit_profile(socketio, "torrent_patch", {**diff, "summary": cached_summary(pid, torrent_cache.snapshot(pid), force=True)}, pid) elif not diff.get("ok"): _emit_profile(socketio, "rtorrent_error", diff, pid) try: status = rtorrent.system_status(profile) if bool(profile.get("is_remote")): status["usage_source"] = "remote-hidden" status["usage_available"] = False else: status["cpu"] = psutil.cpu_percent(interval=None) status["ram"] = psutil.virtual_memory().percent status["usage_source"] = "local" status["usage_available"] = True status["profile_id"] = pid traffic_history.record(pid, status.get("down_rate", 0), status.get("up_rate", 0), status.get("total_down", 0), status.get("total_up", 0)) _emit_profile(socketio, "system_stats", status, pid) heartbeat["ok"] = True except Exception as exc: heartbeat["ok"] = False heartbeat["error"] = str(exc) _emit_profile(socketio, "rtorrent_error", {"profile_id": pid, "error": str(exc)}, pid) if tick % max(1, int(15 * 60 / POLL_INTERVAL)) == 0: # Note: Queue heavier torrent statistics outside the fast system_stats poller. torrent_stats.queue_refresh(socketio, profile, force=False, room=_profile_room(pid) if auth.enabled() else None) if tick % max(1, int(30 / POLL_INTERVAL)) == 0: try: result = smart_queue.check(profile, force=False) if result.get("enabled"): _emit_profile(socketio, "smart_queue_update", result, pid) if result.get("paused") or result.get("resumed") or result.get("resume_requested"): # Note: After Smart Queue changes, refresh cache immediately so the Downloading list does not wait for the next poller cycle. queue_diff = torrent_cache.refresh(profile) if queue_diff.get("ok"): _emit_profile(socketio, "torrent_patch", {**queue_diff, "summary": cached_summary(pid, torrent_cache.snapshot(pid), force=True)}, pid) except Exception as exc: _emit_profile(socketio, "smart_queue_update", {"ok": False, "error": str(exc)}, pid) try: auto_result = automation_rules.check(profile, force=False) if auto_result.get("applied"): _emit_profile(socketio, "automation_update", auto_result, pid) except Exception as exc: _emit_profile(socketio, "automation_update", {"ok": False, "error": str(exc)}, pid) _emit_profile(socketio, "heartbeat", heartbeat, pid) tick += 1 socketio.sleep(POLL_INTERVAL) def ensure_poller_started(): global _started with _start_lock: if not _started: # Note: The poller starts with the app, so Smart Queue and automations work without an open UI. socketio.start_background_task(poller) _started = True ensure_poller_started() @socketio.on("connect") def handle_connect(): ensure_poller_started() if auth.enabled() and not auth.current_user_id(): # Note: Socket.IO uses the same session auth as REST API; unauthenticated clients are disconnected. disconnect() return False profile = active_profile() if profile: join_room(_profile_room(profile["id"])) emit("connected", {"ok": True, "profile": profile}) if not profile: # Note: Fresh installs or users without profile access get setup state, not another user's snapshot. emit("profile_required", {"ok": True, "profiles": []}) return rows = torrent_cache.snapshot(profile["id"]) emit("torrent_snapshot", {"profile_id": profile["id"], "torrents": rows, "summary": cached_summary(profile["id"], rows)}) @socketio.on("select_profile") def handle_select_profile(data): if auth.enabled() and not auth.current_user_id(): disconnect() return old_profile = active_profile() if old_profile: leave_room(_profile_room(old_profile["id"])) profile_id = int((data or {}).get("profile_id") or 0) if not profile_id: # Note: Ignore empty profile selections created before the first rTorrent profile exists. emit("profile_required", {"ok": True, "profiles": []}) return profile = get_profile(profile_id) if not profile: emit("rtorrent_error", {"error": "Profile access denied or profile does not exist"}) return join_room(_profile_room(profile_id)) diff = torrent_cache.refresh(profile) rows = torrent_cache.snapshot(profile_id) emit("torrent_snapshot", {"profile_id": profile_id, "torrents": rows, "summary": cached_summary(profile_id, rows, force=True), "error": diff.get("error", "")})