speeds
This commit is contained in:
159
pytorrent/services/speed_peaks.py
Normal file
159
pytorrent/services/speed_peaks.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from ..db import connect, utcnow
|
||||
from .rtorrent import human_rate
|
||||
|
||||
_SESSION_STARTED_AT = utcnow()
|
||||
_CACHE: dict[int, dict[str, Any]] = {}
|
||||
_LOADED = False
|
||||
_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def _empty_peak(profile_id: int, all_time: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||
# Notatka: jedna struktura w pamięci trzyma bieżącą sesję i rekord ogólny dla profilu rTorrent.
|
||||
all_time = all_time or {}
|
||||
return {
|
||||
"profile_id": int(profile_id),
|
||||
"session_started_at": _SESSION_STARTED_AT,
|
||||
"session_down_peak": 0,
|
||||
"session_up_peak": 0,
|
||||
"session_down_peak_at": None,
|
||||
"session_up_peak_at": None,
|
||||
"all_time_down_peak": int(all_time.get("all_time_down_peak") or 0),
|
||||
"all_time_up_peak": int(all_time.get("all_time_up_peak") or 0),
|
||||
"all_time_down_peak_at": all_time.get("all_time_down_peak_at"),
|
||||
"all_time_up_peak_at": all_time.get("all_time_up_peak_at"),
|
||||
}
|
||||
|
||||
|
||||
def load_cache() -> None:
|
||||
# Notatka: rekordy ogólne są ładowane przy starcie aplikacji, a rekord sesji zaczyna się od zera.
|
||||
global _LOADED
|
||||
with _LOCK:
|
||||
if _LOADED:
|
||||
return
|
||||
with connect() as conn:
|
||||
rows = conn.execute("SELECT * FROM transfer_speed_peaks").fetchall()
|
||||
for row in rows:
|
||||
profile_id = int(row.get("profile_id") or 0)
|
||||
if profile_id:
|
||||
_CACHE[profile_id] = _empty_peak(profile_id, row)
|
||||
_LOADED = True
|
||||
|
||||
|
||||
def _ensure_profile(profile_id: int) -> dict[str, Any]:
|
||||
# Notatka: leniwe ładowanie chroni nowe profile dodane po starcie przed pustymi rekordami.
|
||||
profile_id = int(profile_id)
|
||||
item = _CACHE.get(profile_id)
|
||||
if item:
|
||||
return item
|
||||
with connect() as conn:
|
||||
row = conn.execute("SELECT * FROM transfer_speed_peaks WHERE profile_id=?", (profile_id,)).fetchone()
|
||||
item = _empty_peak(profile_id, row)
|
||||
_CACHE[profile_id] = item
|
||||
return item
|
||||
|
||||
|
||||
def _persist(item: dict[str, Any]) -> None:
|
||||
# Notatka: SQLite dostaje zapis tylko wtedy, gdy pojawił się nowy rekord sesji lub rekord ogólny.
|
||||
now = utcnow()
|
||||
with connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO transfer_speed_peaks(
|
||||
profile_id, session_started_at, session_down_peak, session_up_peak,
|
||||
session_down_peak_at, session_up_peak_at, all_time_down_peak,
|
||||
all_time_up_peak, all_time_down_peak_at, all_time_up_peak_at,
|
||||
created_at, updated_at
|
||||
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
ON CONFLICT(profile_id) DO UPDATE SET
|
||||
session_started_at=excluded.session_started_at,
|
||||
session_down_peak=excluded.session_down_peak,
|
||||
session_up_peak=excluded.session_up_peak,
|
||||
session_down_peak_at=excluded.session_down_peak_at,
|
||||
session_up_peak_at=excluded.session_up_peak_at,
|
||||
all_time_down_peak=excluded.all_time_down_peak,
|
||||
all_time_up_peak=excluded.all_time_up_peak,
|
||||
all_time_down_peak_at=excluded.all_time_down_peak_at,
|
||||
all_time_up_peak_at=excluded.all_time_up_peak_at,
|
||||
updated_at=excluded.updated_at
|
||||
""",
|
||||
(
|
||||
int(item["profile_id"]),
|
||||
item["session_started_at"],
|
||||
int(item["session_down_peak"]),
|
||||
int(item["session_up_peak"]),
|
||||
item.get("session_down_peak_at"),
|
||||
item.get("session_up_peak_at"),
|
||||
int(item["all_time_down_peak"]),
|
||||
int(item["all_time_up_peak"]),
|
||||
item.get("all_time_down_peak_at"),
|
||||
item.get("all_time_up_peak_at"),
|
||||
now,
|
||||
now,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _public(item: dict[str, Any]) -> dict[str, Any]:
|
||||
# Notatka: frontend dostaje zarówno bajty/s, jak i gotowe etykiety w stylu istniejących prędkości.
|
||||
return {
|
||||
"session_started_at": item["session_started_at"],
|
||||
"session": {
|
||||
"down": int(item["session_down_peak"]),
|
||||
"up": int(item["session_up_peak"]),
|
||||
"down_h": human_rate(int(item["session_down_peak"])),
|
||||
"up_h": human_rate(int(item["session_up_peak"])),
|
||||
"down_at": item.get("session_down_peak_at"),
|
||||
"up_at": item.get("session_up_peak_at"),
|
||||
},
|
||||
"all_time": {
|
||||
"down": int(item["all_time_down_peak"]),
|
||||
"up": int(item["all_time_up_peak"]),
|
||||
"down_h": human_rate(int(item["all_time_down_peak"])),
|
||||
"up_h": human_rate(int(item["all_time_up_peak"])),
|
||||
"down_at": item.get("all_time_down_peak_at"),
|
||||
"up_at": item.get("all_time_up_peak_at"),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def record(profile_id: int, down_rate: int = 0, up_rate: int = 0) -> dict[str, Any]:
|
||||
# Notatka: poller wywołuje tę funkcję w tle; baza jest aktualizowana tylko po przebiciu rekordu.
|
||||
load_cache()
|
||||
down_rate = max(0, int(down_rate or 0))
|
||||
up_rate = max(0, int(up_rate or 0))
|
||||
measured_at = utcnow()
|
||||
changed = False
|
||||
with _LOCK:
|
||||
item = _ensure_profile(int(profile_id))
|
||||
if down_rate > int(item["session_down_peak"]):
|
||||
item["session_down_peak"] = down_rate
|
||||
item["session_down_peak_at"] = measured_at
|
||||
changed = True
|
||||
if up_rate > int(item["session_up_peak"]):
|
||||
item["session_up_peak"] = up_rate
|
||||
item["session_up_peak_at"] = measured_at
|
||||
changed = True
|
||||
if down_rate > int(item["all_time_down_peak"]):
|
||||
item["all_time_down_peak"] = down_rate
|
||||
item["all_time_down_peak_at"] = measured_at
|
||||
changed = True
|
||||
if up_rate > int(item["all_time_up_peak"]):
|
||||
item["all_time_up_peak"] = up_rate
|
||||
item["all_time_up_peak_at"] = measured_at
|
||||
changed = True
|
||||
result = _public(item)
|
||||
if changed:
|
||||
_persist(item)
|
||||
return result
|
||||
|
||||
|
||||
def current(profile_id: int) -> dict[str, Any]:
|
||||
# Notatka: REST API może pokazać ostatni znany rekord bez wymuszania nowego pomiaru.
|
||||
load_cache()
|
||||
with _LOCK:
|
||||
return _public(_ensure_profile(int(profile_id)))
|
||||
Reference in New Issue
Block a user