Files
pyTorrent/pytorrent/services/websocket.py
Mateusz Gruszczyński dc1cac4e6f add auth support
2026-05-06 08:38:07 +02:00

142 lines
7.1 KiB
Python

from __future__ import annotations
import threading
import psutil
from flask_socketio import emit, join_room, leave_room, disconnect
from ..config import POLL_INTERVAL
from .preferences import active_profile, get_profile
from .torrent_cache import torrent_cache
from .torrent_summary import cached_summary
from . import rtorrent, smart_queue, traffic_history, automation_rules, torrent_stats, auth
def _profile_room(profile_id: int) -> str:
return f"profile:{int(profile_id)}"
def _poller_profiles() -> list[dict]:
# Note: Background polling has no browser session, so auth-enabled mode refreshes all profiles and emits only to per-profile rooms.
if not auth.enabled():
profile = active_profile()
return [profile] if profile else []
from ..db import connect
with connect() as conn:
return conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()
def _emit_profile(socketio, event: str, payload: dict, profile_id: int) -> None:
target = _profile_room(profile_id) if auth.enabled() else None
socketio.emit(event, payload, to=target) if target else socketio.emit(event, payload)
_started = False
_start_lock = threading.Lock()
def register_socketio_handlers(socketio):
def poller():
tick = 0
while True:
for profile in _poller_profiles():
if not profile:
continue
pid = int(profile["id"])
diff = torrent_cache.refresh(profile)
heartbeat = {"ok": bool(diff.get("ok")), "profile_id": pid, "tick": tick, "error": diff.get("error", "")}
if diff.get("ok") and (diff["added"] or diff["updated"] or diff["removed"]):
_emit_profile(socketio, "torrent_patch", {**diff, "summary": cached_summary(pid, torrent_cache.snapshot(pid), force=True)}, pid)
elif not diff.get("ok"):
_emit_profile(socketio, "rtorrent_error", diff, pid)
try:
status = rtorrent.system_status(profile)
if bool(profile.get("is_remote")):
status["usage_source"] = "remote-hidden"
status["usage_available"] = False
else:
status["cpu"] = psutil.cpu_percent(interval=None)
status["ram"] = psutil.virtual_memory().percent
status["usage_source"] = "local"
status["usage_available"] = True
status["profile_id"] = pid
traffic_history.record(pid, status.get("down_rate", 0), status.get("up_rate", 0), status.get("total_down", 0), status.get("total_up", 0))
_emit_profile(socketio, "system_stats", status, pid)
heartbeat["ok"] = True
except Exception as exc:
heartbeat["ok"] = False
heartbeat["error"] = str(exc)
_emit_profile(socketio, "rtorrent_error", {"profile_id": pid, "error": str(exc)}, pid)
if tick % max(1, int(15 * 60 / POLL_INTERVAL)) == 0:
# Note: Queue heavier torrent statistics outside the fast system_stats poller.
torrent_stats.queue_refresh(socketio, profile, force=False, room=_profile_room(pid) if auth.enabled() else None)
if tick % max(1, int(30 / POLL_INTERVAL)) == 0:
try:
result = smart_queue.check(profile, force=False)
if result.get("enabled"):
_emit_profile(socketio, "smart_queue_update", result, pid)
if result.get("paused") or result.get("resumed") or result.get("resume_requested"):
# Note: After Smart Queue changes, refresh cache immediately so the Downloading list does not wait for the next poller cycle.
queue_diff = torrent_cache.refresh(profile)
if queue_diff.get("ok"):
_emit_profile(socketio, "torrent_patch", {**queue_diff, "summary": cached_summary(pid, torrent_cache.snapshot(pid), force=True)}, pid)
except Exception as exc:
_emit_profile(socketio, "smart_queue_update", {"ok": False, "error": str(exc)}, pid)
try:
auto_result = automation_rules.check(profile, force=False)
if auto_result.get("applied"):
_emit_profile(socketio, "automation_update", auto_result, pid)
except Exception as exc:
_emit_profile(socketio, "automation_update", {"ok": False, "error": str(exc)}, pid)
_emit_profile(socketio, "heartbeat", heartbeat, pid)
tick += 1
socketio.sleep(POLL_INTERVAL)
def ensure_poller_started():
global _started
with _start_lock:
if not _started:
# Note: The poller starts with the app, so Smart Queue and automations work without an open UI.
socketio.start_background_task(poller)
_started = True
ensure_poller_started()
@socketio.on("connect")
def handle_connect():
ensure_poller_started()
if auth.enabled() and not auth.current_user_id():
# Note: Socket.IO uses the same session auth as REST API; unauthenticated clients are disconnected.
disconnect()
return False
profile = active_profile()
if profile:
join_room(_profile_room(profile["id"]))
emit("connected", {"ok": True, "profile": profile})
if not profile:
# Note: Fresh installs or users without profile access get setup state, not another user's snapshot.
emit("profile_required", {"ok": True, "profiles": []})
return
rows = torrent_cache.snapshot(profile["id"])
emit("torrent_snapshot", {"profile_id": profile["id"], "torrents": rows, "summary": cached_summary(profile["id"], rows)})
@socketio.on("select_profile")
def handle_select_profile(data):
if auth.enabled() and not auth.current_user_id():
disconnect()
return
old_profile = active_profile()
if old_profile:
leave_room(_profile_room(old_profile["id"]))
profile_id = int((data or {}).get("profile_id") or 0)
if not profile_id:
# Note: Ignore empty profile selections created before the first rTorrent profile exists.
emit("profile_required", {"ok": True, "profiles": []})
return
profile = get_profile(profile_id)
if not profile:
emit("rtorrent_error", {"error": "Profile access denied or profile does not exist"})
return
join_room(_profile_room(profile_id))
diff = torrent_cache.refresh(profile)
rows = torrent_cache.snapshot(profile_id)
emit("torrent_snapshot", {"profile_id": profile_id, "torrents": rows, "summary": cached_summary(profile_id, rows, force=True), "error": diff.get("error", "")})