160 lines
6.1 KiB
Python
160 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
import threading
|
|
from typing import Any
|
|
|
|
from ..db import connect, utcnow
|
|
from .rtorrent import human_rate
|
|
|
|
_SESSION_STARTED_AT = utcnow()
|
|
_CACHE: dict[int, dict[str, Any]] = {}
|
|
_LOADED = False
|
|
_LOCK = threading.Lock()
|
|
|
|
|
|
def _empty_peak(profile_id: int, all_time: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
# Notatka: jedna struktura w pamięci trzyma bieżącą sesję i rekord ogólny dla profilu rTorrent.
|
|
all_time = all_time or {}
|
|
return {
|
|
"profile_id": int(profile_id),
|
|
"session_started_at": _SESSION_STARTED_AT,
|
|
"session_down_peak": 0,
|
|
"session_up_peak": 0,
|
|
"session_down_peak_at": None,
|
|
"session_up_peak_at": None,
|
|
"all_time_down_peak": int(all_time.get("all_time_down_peak") or 0),
|
|
"all_time_up_peak": int(all_time.get("all_time_up_peak") or 0),
|
|
"all_time_down_peak_at": all_time.get("all_time_down_peak_at"),
|
|
"all_time_up_peak_at": all_time.get("all_time_up_peak_at"),
|
|
}
|
|
|
|
|
|
def load_cache() -> None:
|
|
# Notatka: rekordy ogólne są ładowane przy starcie aplikacji, a rekord sesji zaczyna się od zera.
|
|
global _LOADED
|
|
with _LOCK:
|
|
if _LOADED:
|
|
return
|
|
with connect() as conn:
|
|
rows = conn.execute("SELECT * FROM transfer_speed_peaks").fetchall()
|
|
for row in rows:
|
|
profile_id = int(row.get("profile_id") or 0)
|
|
if profile_id:
|
|
_CACHE[profile_id] = _empty_peak(profile_id, row)
|
|
_LOADED = True
|
|
|
|
|
|
def _ensure_profile(profile_id: int) -> dict[str, Any]:
|
|
# Notatka: leniwe ładowanie chroni nowe profile dodane po starcie przed pustymi rekordami.
|
|
profile_id = int(profile_id)
|
|
item = _CACHE.get(profile_id)
|
|
if item:
|
|
return item
|
|
with connect() as conn:
|
|
row = conn.execute("SELECT * FROM transfer_speed_peaks WHERE profile_id=?", (profile_id,)).fetchone()
|
|
item = _empty_peak(profile_id, row)
|
|
_CACHE[profile_id] = item
|
|
return item
|
|
|
|
|
|
def _persist(item: dict[str, Any]) -> None:
|
|
# Notatka: SQLite dostaje zapis tylko wtedy, gdy pojawił się nowy rekord sesji lub rekord ogólny.
|
|
now = utcnow()
|
|
with connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO transfer_speed_peaks(
|
|
profile_id, session_started_at, session_down_peak, session_up_peak,
|
|
session_down_peak_at, session_up_peak_at, all_time_down_peak,
|
|
all_time_up_peak, all_time_down_peak_at, all_time_up_peak_at,
|
|
created_at, updated_at
|
|
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)
|
|
ON CONFLICT(profile_id) DO UPDATE SET
|
|
session_started_at=excluded.session_started_at,
|
|
session_down_peak=excluded.session_down_peak,
|
|
session_up_peak=excluded.session_up_peak,
|
|
session_down_peak_at=excluded.session_down_peak_at,
|
|
session_up_peak_at=excluded.session_up_peak_at,
|
|
all_time_down_peak=excluded.all_time_down_peak,
|
|
all_time_up_peak=excluded.all_time_up_peak,
|
|
all_time_down_peak_at=excluded.all_time_down_peak_at,
|
|
all_time_up_peak_at=excluded.all_time_up_peak_at,
|
|
updated_at=excluded.updated_at
|
|
""",
|
|
(
|
|
int(item["profile_id"]),
|
|
item["session_started_at"],
|
|
int(item["session_down_peak"]),
|
|
int(item["session_up_peak"]),
|
|
item.get("session_down_peak_at"),
|
|
item.get("session_up_peak_at"),
|
|
int(item["all_time_down_peak"]),
|
|
int(item["all_time_up_peak"]),
|
|
item.get("all_time_down_peak_at"),
|
|
item.get("all_time_up_peak_at"),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
|
|
|
|
def _public(item: dict[str, Any]) -> dict[str, Any]:
|
|
# Notatka: frontend dostaje zarówno bajty/s, jak i gotowe etykiety w stylu istniejących prędkości.
|
|
return {
|
|
"session_started_at": item["session_started_at"],
|
|
"session": {
|
|
"down": int(item["session_down_peak"]),
|
|
"up": int(item["session_up_peak"]),
|
|
"down_h": human_rate(int(item["session_down_peak"])),
|
|
"up_h": human_rate(int(item["session_up_peak"])),
|
|
"down_at": item.get("session_down_peak_at"),
|
|
"up_at": item.get("session_up_peak_at"),
|
|
},
|
|
"all_time": {
|
|
"down": int(item["all_time_down_peak"]),
|
|
"up": int(item["all_time_up_peak"]),
|
|
"down_h": human_rate(int(item["all_time_down_peak"])),
|
|
"up_h": human_rate(int(item["all_time_up_peak"])),
|
|
"down_at": item.get("all_time_down_peak_at"),
|
|
"up_at": item.get("all_time_up_peak_at"),
|
|
},
|
|
}
|
|
|
|
|
|
def record(profile_id: int, down_rate: int = 0, up_rate: int = 0) -> dict[str, Any]:
|
|
# Notatka: poller wywołuje tę funkcję w tle; baza jest aktualizowana tylko po przebiciu rekordu.
|
|
load_cache()
|
|
down_rate = max(0, int(down_rate or 0))
|
|
up_rate = max(0, int(up_rate or 0))
|
|
measured_at = utcnow()
|
|
changed = False
|
|
with _LOCK:
|
|
item = _ensure_profile(int(profile_id))
|
|
if down_rate > int(item["session_down_peak"]):
|
|
item["session_down_peak"] = down_rate
|
|
item["session_down_peak_at"] = measured_at
|
|
changed = True
|
|
if up_rate > int(item["session_up_peak"]):
|
|
item["session_up_peak"] = up_rate
|
|
item["session_up_peak_at"] = measured_at
|
|
changed = True
|
|
if down_rate > int(item["all_time_down_peak"]):
|
|
item["all_time_down_peak"] = down_rate
|
|
item["all_time_down_peak_at"] = measured_at
|
|
changed = True
|
|
if up_rate > int(item["all_time_up_peak"]):
|
|
item["all_time_up_peak"] = up_rate
|
|
item["all_time_up_peak_at"] = measured_at
|
|
changed = True
|
|
result = _public(item)
|
|
if changed:
|
|
_persist(item)
|
|
return result
|
|
|
|
|
|
def current(profile_id: int) -> dict[str, Any]:
|
|
# Notatka: REST API może pokazać ostatni znany rekord bez wymuszania nowego pomiaru.
|
|
load_cache()
|
|
with _LOCK:
|
|
return _public(_ensure_profile(int(profile_id)))
|