from __future__ import annotations import threading from typing import Any from ..db import connect, utcnow from .rtorrent import human_rate _SESSION_STARTED_AT = utcnow() _CACHE: dict[int, dict[str, Any]] = {} _LOADED = False _LOCK = threading.Lock() def _empty_peak(profile_id: int, all_time: dict[str, Any] | None = None) -> dict[str, Any]: # Notatka: jedna struktura w pamięci trzyma bieżącą sesję i rekord ogólny dla profilu rTorrent. all_time = all_time or {} return { "profile_id": int(profile_id), "session_started_at": _SESSION_STARTED_AT, "session_down_peak": 0, "session_up_peak": 0, "session_down_peak_at": None, "session_up_peak_at": None, "all_time_down_peak": int(all_time.get("all_time_down_peak") or 0), "all_time_up_peak": int(all_time.get("all_time_up_peak") or 0), "all_time_down_peak_at": all_time.get("all_time_down_peak_at"), "all_time_up_peak_at": all_time.get("all_time_up_peak_at"), } def load_cache() -> None: # Notatka: rekordy ogólne są ładowane przy starcie aplikacji, a rekord sesji zaczyna się od zera. global _LOADED with _LOCK: if _LOADED: return with connect() as conn: rows = conn.execute("SELECT * FROM transfer_speed_peaks").fetchall() for row in rows: profile_id = int(row.get("profile_id") or 0) if profile_id: _CACHE[profile_id] = _empty_peak(profile_id, row) _LOADED = True def _ensure_profile(profile_id: int) -> dict[str, Any]: # Notatka: leniwe ładowanie chroni nowe profile dodane po starcie przed pustymi rekordami. profile_id = int(profile_id) item = _CACHE.get(profile_id) if item: return item with connect() as conn: row = conn.execute("SELECT * FROM transfer_speed_peaks WHERE profile_id=?", (profile_id,)).fetchone() item = _empty_peak(profile_id, row) _CACHE[profile_id] = item return item def _persist(item: dict[str, Any]) -> None: # Notatka: SQLite dostaje zapis tylko wtedy, gdy pojawił się nowy rekord sesji lub rekord ogólny. now = utcnow() with connect() as conn: conn.execute( """ INSERT INTO transfer_speed_peaks( profile_id, session_started_at, session_down_peak, session_up_peak, session_down_peak_at, session_up_peak_at, all_time_down_peak, all_time_up_peak, all_time_down_peak_at, all_time_up_peak_at, created_at, updated_at ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(profile_id) DO UPDATE SET session_started_at=excluded.session_started_at, session_down_peak=excluded.session_down_peak, session_up_peak=excluded.session_up_peak, session_down_peak_at=excluded.session_down_peak_at, session_up_peak_at=excluded.session_up_peak_at, all_time_down_peak=excluded.all_time_down_peak, all_time_up_peak=excluded.all_time_up_peak, all_time_down_peak_at=excluded.all_time_down_peak_at, all_time_up_peak_at=excluded.all_time_up_peak_at, updated_at=excluded.updated_at """, ( int(item["profile_id"]), item["session_started_at"], int(item["session_down_peak"]), int(item["session_up_peak"]), item.get("session_down_peak_at"), item.get("session_up_peak_at"), int(item["all_time_down_peak"]), int(item["all_time_up_peak"]), item.get("all_time_down_peak_at"), item.get("all_time_up_peak_at"), now, now, ), ) def _public(item: dict[str, Any]) -> dict[str, Any]: # Notatka: frontend dostaje zarówno bajty/s, jak i gotowe etykiety w stylu istniejących prędkości. return { "session_started_at": item["session_started_at"], "session": { "down": int(item["session_down_peak"]), "up": int(item["session_up_peak"]), "down_h": human_rate(int(item["session_down_peak"])), "up_h": human_rate(int(item["session_up_peak"])), "down_at": item.get("session_down_peak_at"), "up_at": item.get("session_up_peak_at"), }, "all_time": { "down": int(item["all_time_down_peak"]), "up": int(item["all_time_up_peak"]), "down_h": human_rate(int(item["all_time_down_peak"])), "up_h": human_rate(int(item["all_time_up_peak"])), "down_at": item.get("all_time_down_peak_at"), "up_at": item.get("all_time_up_peak_at"), }, } def record(profile_id: int, down_rate: int = 0, up_rate: int = 0) -> dict[str, Any]: # Notatka: poller wywołuje tę funkcję w tle; baza jest aktualizowana tylko po przebiciu rekordu. load_cache() down_rate = max(0, int(down_rate or 0)) up_rate = max(0, int(up_rate or 0)) measured_at = utcnow() changed = False with _LOCK: item = _ensure_profile(int(profile_id)) if down_rate > int(item["session_down_peak"]): item["session_down_peak"] = down_rate item["session_down_peak_at"] = measured_at changed = True if up_rate > int(item["session_up_peak"]): item["session_up_peak"] = up_rate item["session_up_peak_at"] = measured_at changed = True if down_rate > int(item["all_time_down_peak"]): item["all_time_down_peak"] = down_rate item["all_time_down_peak_at"] = measured_at changed = True if up_rate > int(item["all_time_up_peak"]): item["all_time_up_peak"] = up_rate item["all_time_up_peak_at"] = measured_at changed = True result = _public(item) if changed: _persist(item) return result def current(profile_id: int) -> dict[str, Any]: # Notatka: REST API może pokazać ostatni znany rekord bez wymuszania nowego pomiaru. load_cache() with _LOCK: return _public(_ensure_profile(int(profile_id)))