245 lines
11 KiB
Python
245 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from ..db import connect, utcnow
|
|
from ..config import POLL_INTERVAL, MIN_POLL_INTERVAL_SECONDS
|
|
|
|
DEFAULTS = {
|
|
"adaptive_enabled": True,
|
|
"safe_fallback_enabled": True,
|
|
"active_interval_seconds": 5.0,
|
|
"idle_interval_seconds": 15.0,
|
|
"error_interval_seconds": 30.0,
|
|
"torrent_list_interval_seconds": 5.0,
|
|
"system_stats_interval_seconds": 5.0,
|
|
"tracker_stats_interval_seconds": 300.0,
|
|
"disk_stats_interval_seconds": 60.0,
|
|
"queue_stats_interval_seconds": 15.0,
|
|
"slow_stats_interval_seconds": 60.0,
|
|
"heartbeat_interval_seconds": 15.0,
|
|
"emit_heartbeat_on_change": True,
|
|
"slow_response_threshold_ms": 8000.0,
|
|
"slowdown_multiplier": 2.0,
|
|
"recovery_after_errors": 3,
|
|
}
|
|
|
|
|
|
def _key(profile_id: int) -> str:
|
|
return f"poller.settings.{int(profile_id)}"
|
|
|
|
|
|
def _state_key(profile_id: int) -> str:
|
|
return f"poller.runtime.{int(profile_id)}"
|
|
|
|
|
|
def _coerce_float(value: Any, default: float, lo: float, hi: float) -> float:
|
|
try:
|
|
number = float(value)
|
|
except Exception:
|
|
return default
|
|
return max(lo, min(hi, number))
|
|
|
|
|
|
def normalize_settings(data: dict | None) -> dict:
|
|
raw = {**DEFAULTS, **(data or {})}
|
|
settings = {
|
|
"adaptive_enabled": bool(raw.get("adaptive_enabled")),
|
|
"safe_fallback_enabled": bool(raw.get("safe_fallback_enabled", True)),
|
|
"active_interval_seconds": _coerce_float(raw.get("active_interval_seconds"), DEFAULTS["active_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 30.0),
|
|
"idle_interval_seconds": _coerce_float(raw.get("idle_interval_seconds"), DEFAULTS["idle_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 120.0),
|
|
"error_interval_seconds": _coerce_float(raw.get("error_interval_seconds"), DEFAULTS["error_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 300.0),
|
|
"torrent_list_interval_seconds": _coerce_float(raw.get("torrent_list_interval_seconds"), DEFAULTS["torrent_list_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 120.0),
|
|
"system_stats_interval_seconds": _coerce_float(raw.get("system_stats_interval_seconds"), DEFAULTS["system_stats_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 120.0),
|
|
"tracker_stats_interval_seconds": _coerce_float(raw.get("tracker_stats_interval_seconds"), DEFAULTS["tracker_stats_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 1800.0),
|
|
"disk_stats_interval_seconds": _coerce_float(raw.get("disk_stats_interval_seconds"), DEFAULTS["disk_stats_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 1800.0),
|
|
"queue_stats_interval_seconds": _coerce_float(raw.get("queue_stats_interval_seconds"), DEFAULTS["queue_stats_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 1800.0),
|
|
"slow_stats_interval_seconds": _coerce_float(raw.get("slow_stats_interval_seconds"), DEFAULTS["slow_stats_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 1800.0),
|
|
"heartbeat_interval_seconds": _coerce_float(raw.get("heartbeat_interval_seconds"), DEFAULTS["heartbeat_interval_seconds"], MIN_POLL_INTERVAL_SECONDS, 300.0),
|
|
"emit_heartbeat_on_change": bool(raw.get("emit_heartbeat_on_change")),
|
|
"slow_response_threshold_ms": _coerce_float(raw.get("slow_response_threshold_ms"), DEFAULTS["slow_response_threshold_ms"], 100.0, 60000.0),
|
|
"slowdown_multiplier": _coerce_float(raw.get("slowdown_multiplier"), DEFAULTS["slowdown_multiplier"], 1.0, 10.0),
|
|
"recovery_after_errors": int(_coerce_float(raw.get("recovery_after_errors"), 3, 1, 20)),
|
|
}
|
|
if settings["safe_fallback_enabled"]:
|
|
for key in ("active_interval_seconds", "idle_interval_seconds", "error_interval_seconds", "torrent_list_interval_seconds", "system_stats_interval_seconds", "queue_stats_interval_seconds"):
|
|
if settings[key] <= 0:
|
|
settings[key] = DEFAULTS[key]
|
|
return settings
|
|
|
|
|
|
def get_settings(profile_id: int) -> dict:
|
|
with connect() as conn:
|
|
row = conn.execute("SELECT value FROM app_settings WHERE key=?", (_key(profile_id),)).fetchone()
|
|
try:
|
|
data = json.loads(row.get("value") or "{}") if row else {}
|
|
except Exception:
|
|
data = {}
|
|
return normalize_settings(data)
|
|
|
|
|
|
def save_settings(profile_id: int, data: dict) -> dict:
|
|
settings = normalize_settings(data)
|
|
with connect() as conn:
|
|
conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (_key(profile_id), json.dumps(settings)))
|
|
return settings
|
|
|
|
|
|
@dataclass
|
|
class ProfilePollState:
|
|
profile_id: int
|
|
last_fast_at: float = 0.0
|
|
last_system_at: float = 0.0
|
|
last_slow_at: float = 0.0
|
|
last_tracker_at: float = 0.0
|
|
last_disk_at: float = 0.0
|
|
last_queue_at: float = 0.0
|
|
last_heartbeat_at: float = 0.0
|
|
last_ok: bool = True
|
|
last_active: bool = False
|
|
last_error: str = ""
|
|
last_tick_ms: float = 0.0
|
|
last_tick_started_at: float = 0.0
|
|
last_tick_gap_ms: float = 0.0
|
|
effective_interval_seconds: float = 0.0
|
|
tick_count: int = 0
|
|
sleep_hint: float = 1.0
|
|
error_count: int = 0
|
|
slow_count: int = 0
|
|
skipped_emissions: int = 0
|
|
emitted_payload_size: int = 0
|
|
rtorrent_call_count: int = 0
|
|
adaptive_mode: str = "normal"
|
|
slow_task_running: bool = False
|
|
system_task_running: bool = False
|
|
stats: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
_STATES: dict[int, ProfilePollState] = {}
|
|
|
|
|
|
def state_for(profile_id: int) -> ProfilePollState:
|
|
profile_id = int(profile_id)
|
|
state = _STATES.get(profile_id)
|
|
if state is None:
|
|
state = ProfilePollState(profile_id=profile_id)
|
|
_STATES[profile_id] = state
|
|
return state
|
|
|
|
|
|
def interval_for(settings: dict, state: ProfilePollState) -> float:
|
|
if not settings.get("adaptive_enabled"):
|
|
return float(settings["active_interval_seconds"])
|
|
if not state.last_ok:
|
|
return float(settings["error_interval_seconds"])
|
|
base = float(settings["active_interval_seconds"] if state.last_active else settings["idle_interval_seconds"])
|
|
if state.adaptive_mode == "slowdown":
|
|
return min(float(settings["error_interval_seconds"]), base * float(settings.get("slowdown_multiplier") or 2.0))
|
|
return base
|
|
|
|
|
|
def effective_fast_interval(settings: dict, state: ProfilePollState) -> float:
|
|
return max(MIN_POLL_INTERVAL_SECONDS, interval_for(settings, state), float(settings.get("torrent_list_interval_seconds") or DEFAULTS["torrent_list_interval_seconds"]))
|
|
|
|
|
|
def should_fast_poll(now: float, settings: dict, state: ProfilePollState) -> bool:
|
|
return (now - state.last_fast_at) >= effective_fast_interval(settings, state)
|
|
|
|
|
|
def should_system_poll(now: float, settings: dict, state: ProfilePollState) -> bool:
|
|
return (now - state.last_system_at) >= float(settings["system_stats_interval_seconds"])
|
|
|
|
|
|
def should_slow_poll(now: float, settings: dict, state: ProfilePollState) -> bool:
|
|
return (now - state.last_slow_at) >= float(settings["slow_stats_interval_seconds"])
|
|
|
|
|
|
def should_tracker_poll(now: float, settings: dict, state: ProfilePollState) -> bool:
|
|
return (now - state.last_tracker_at) >= float(settings["tracker_stats_interval_seconds"])
|
|
|
|
|
|
def should_disk_poll(now: float, settings: dict, state: ProfilePollState) -> bool:
|
|
return (now - state.last_disk_at) >= float(settings["disk_stats_interval_seconds"])
|
|
|
|
|
|
def should_queue_poll(now: float, settings: dict, state: ProfilePollState) -> bool:
|
|
return (now - state.last_queue_at) >= float(settings["queue_stats_interval_seconds"])
|
|
|
|
|
|
def should_heartbeat(now: float, settings: dict, state: ProfilePollState, changed: bool) -> bool:
|
|
if changed and settings.get("emit_heartbeat_on_change"):
|
|
return True
|
|
return (now - state.last_heartbeat_at) >= float(settings["heartbeat_interval_seconds"])
|
|
|
|
|
|
def mark_tick(state: ProfilePollState, started_at: float, active: bool, ok: bool, error: str = "", emitted_payload_size: int = 0, rtorrent_call_count: int = 0, skipped_emissions: int = 0, settings: dict | None = None) -> dict:
|
|
now = time.monotonic()
|
|
effective_settings = normalize_settings(settings) if settings is not None else DEFAULTS
|
|
previous_started_at = state.last_tick_started_at
|
|
state.tick_count += 1
|
|
state.last_tick_ms = round((now - started_at) * 1000.0, 2)
|
|
state.last_tick_gap_ms = round((started_at - previous_started_at) * 1000.0, 2) if previous_started_at else 0.0
|
|
state.last_tick_started_at = started_at
|
|
state.last_active = bool(active)
|
|
state.effective_interval_seconds = effective_fast_interval(effective_settings, state)
|
|
state.last_ok = bool(ok)
|
|
state.last_error = str(error or "")
|
|
state.emitted_payload_size = int(emitted_payload_size or 0)
|
|
state.rtorrent_call_count = int(rtorrent_call_count or 0)
|
|
state.skipped_emissions += int(skipped_emissions or 0)
|
|
adaptive_enabled = bool(effective_settings.get("adaptive_enabled", DEFAULTS["adaptive_enabled"]))
|
|
|
|
if not adaptive_enabled:
|
|
# Adaptive mode is explicitly disabled for this rTorrent profile. Keep metrics,
|
|
# but do not enter slowdown/recovery or preserve a stale adaptive state from
|
|
# earlier ticks; otherwise refreshes remain slow even with the toggle off.
|
|
state.error_count = 0 if ok else state.error_count + 1
|
|
state.slow_count = 0
|
|
state.adaptive_mode = "fixed"
|
|
else:
|
|
if ok:
|
|
state.error_count = 0
|
|
else:
|
|
state.error_count += 1
|
|
threshold = float(effective_settings.get("slow_response_threshold_ms") or DEFAULTS["slow_response_threshold_ms"])
|
|
recovery_after = int(effective_settings.get("recovery_after_errors") or DEFAULTS["recovery_after_errors"])
|
|
if state.last_tick_ms >= threshold:
|
|
state.slow_count += 1
|
|
state.adaptive_mode = "slowdown"
|
|
elif ok and state.error_count == 0 and state.slow_count:
|
|
state.slow_count = max(0, state.slow_count - 1)
|
|
if not ok and state.error_count >= recovery_after:
|
|
state.adaptive_mode = "recovery"
|
|
elif ok and state.slow_count == 0:
|
|
state.adaptive_mode = "normal" if state.last_active else "idle"
|
|
state.sleep_hint = max(MIN_POLL_INTERVAL_SECONDS, min(10.0, state.sleep_hint))
|
|
state.stats = {
|
|
"profile_id": state.profile_id,
|
|
"tick_count": state.tick_count,
|
|
"last_tick_ms": state.last_tick_ms,
|
|
"last_active": state.last_active,
|
|
"last_ok": state.last_ok,
|
|
"last_tick_gap_ms": state.last_tick_gap_ms,
|
|
"effective_interval_seconds": state.effective_interval_seconds,
|
|
"configured_min_interval_seconds": MIN_POLL_INTERVAL_SECONDS,
|
|
"last_error": state.last_error,
|
|
"duration_ms": state.last_tick_ms,
|
|
"emitted_payload_size": state.emitted_payload_size,
|
|
"rtorrent_call_count": state.rtorrent_call_count,
|
|
"skipped_emissions": state.skipped_emissions,
|
|
"adaptive_enabled": adaptive_enabled,
|
|
"adaptive_mode": state.adaptive_mode,
|
|
"error_count": state.error_count,
|
|
"slow_count": state.slow_count,
|
|
"updated_at": utcnow(),
|
|
}
|
|
return dict(state.stats)
|
|
|
|
|
|
def snapshot(profile_id: int) -> dict:
|
|
state = state_for(profile_id)
|
|
return dict(state.stats or {"profile_id": int(profile_id), "tick_count": state.tick_count})
|