light poller commit3

This commit is contained in:
Mateusz Gruszczyński
2026-05-27 15:13:22 +02:00
parent 054c9122f8
commit a8adee0f2f
6 changed files with 143 additions and 4 deletions

View File

@@ -23,7 +23,7 @@ from flask import Blueprint, jsonify, request, abort, send_file, redirect, Respo
from ..config import DB_PATH, JOBS_RETENTION_DAYS, SMART_QUEUE_HISTORY_RETENTION_DAYS, LOG_RETENTION_DAYS, WORKERS, PYTORRENT_TMP_DIR from ..config import DB_PATH, JOBS_RETENTION_DAYS, SMART_QUEUE_HISTORY_RETENTION_DAYS, LOG_RETENTION_DAYS, WORKERS, PYTORRENT_TMP_DIR
from ..db import connect, utcnow from ..db import connect, utcnow
from ..services.auth import current_user_id as default_user_id, current_user, list_users, save_user, delete_user, login_user, logout_user, enabled as auth_enabled, require_profile_write from ..services.auth import current_user_id as default_user_id, current_user, list_users, save_user, delete_user, login_user, logout_user, enabled as auth_enabled, require_profile_write
from ..services import preferences, rtorrent, torrent_stats, speed_peaks, tracker_cache, rss as rss_service, ratio_rules, backup as backup_service, download_planner, operation_logs from ..services import preferences, rtorrent, torrent_stats, speed_peaks, tracker_cache, rss as rss_service, ratio_rules, backup as backup_service, download_planner, operation_logs, poller_control
from ..services.torrent_cache import torrent_cache from ..services.torrent_cache import torrent_cache
from ..services.torrent_summary import cached_summary from ..services.torrent_summary import cached_summary
from ..services.workers import enqueue, list_jobs, cancel_job, retry_job, force_job, clear_jobs, emergency_clear_jobs from ..services.workers import enqueue, list_jobs, cancel_job, retry_job, force_job, clear_jobs, emergency_clear_jobs
@@ -290,6 +290,7 @@ def cleanup_summary() -> dict:
(profile_id,), (profile_id,),
) if profile_id else _table_count("operation_logs") ) if profile_id else _table_count("operation_logs")
operation_log_retention = operation_logs.get_settings(profile_id) if profile_id else operation_logs.get_settings(0) operation_log_retention = operation_logs.get_settings(profile_id) if profile_id else operation_logs.get_settings(0)
poller_runtime = poller_control.snapshot(profile_id) if profile_id else {}
return { return {
"jobs_total": _table_count("jobs"), "jobs_total": _table_count("jobs"),
"jobs_clearable": _table_count("jobs", "WHERE status NOT IN ('pending', 'running')"), "jobs_clearable": _table_count("jobs", "WHERE status NOT IN ('pending', 'running')"),
@@ -298,6 +299,7 @@ def cleanup_summary() -> dict:
"automation_history_total": _table_count("automation_history"), "automation_history_total": _table_count("automation_history"),
"planner_history_total": download_planner.history_count(profile_id) if profile_id else 0, "planner_history_total": download_planner.history_count(profile_id) if profile_id else 0,
"cache": _active_profile_cache_summary(profile_id if profile_id else None), "cache": _active_profile_cache_summary(profile_id if profile_id else None),
"poller_runtime": poller_runtime,
"retention_days": { "retention_days": {
"jobs": JOBS_RETENTION_DAYS, "jobs": JOBS_RETENTION_DAYS,
"smart_queue_history": SMART_QUEUE_HISTORY_RETENTION_DAYS, "smart_queue_history": SMART_QUEUE_HISTORY_RETENTION_DAYS,

View File

@@ -253,6 +253,18 @@ def cleanup_automations():
@bp.post("/cleanup/poller-diagnostics")
def cleanup_poller_diagnostics():
profile = preferences.active_profile()
if not profile:
return jsonify({"ok": False, "error": "No profile"}), 400
profile_id = int(profile["id"])
# Note: This cleanup clears only in-memory poller diagnostics; polling, settings and torrent state are preserved.
runtime = poller_control.reset_runtime_stats(profile_id)
return ok({"deleted": {"poller_runtime_counters": 1}, "runtime": runtime, "cleanup": cleanup_summary()})
@bp.post("/cleanup/all") @bp.post("/cleanup/all")
def cleanup_all(): def cleanup_all():
deleted_jobs = clear_jobs() deleted_jobs = clear_jobs()

View File

@@ -126,6 +126,24 @@ class ProfilePollState:
skipped_emissions: int = 0 skipped_emissions: int = 0
emitted_payload_size: int = 0 emitted_payload_size: int = 0
rtorrent_call_count: int = 0 rtorrent_call_count: int = 0
live_poll_count: int = 0
list_poll_count: int = 0
live_updated_total: int = 0
live_full_refresh_requested_total: int = 0
list_added_total: int = 0
list_updated_total: int = 0
list_removed_total: int = 0
last_live_duration_ms: float = 0.0
last_list_duration_ms: float = 0.0
last_live_updated_count: int = 0
last_list_added_count: int = 0
last_list_updated_count: int = 0
last_list_removed_count: int = 0
last_live_ok: bool = True
last_list_ok: bool = True
last_live_error: str = ""
last_list_error: str = ""
last_live_requires_full_refresh: bool = False
adaptive_mode: str = "normal" adaptive_mode: str = "normal"
slow_task_running: bool = False slow_task_running: bool = False
system_task_running: bool = False system_task_running: bool = False
@@ -206,6 +224,69 @@ def should_heartbeat(now: float, settings: dict, state: ProfilePollState, change
return (now - state.last_heartbeat_at) >= float(settings["heartbeat_interval_seconds"]) return (now - state.last_heartbeat_at) >= float(settings["heartbeat_interval_seconds"])
def mark_live_poll(state: ProfilePollState, started_at: float, ok: bool, error: str = "", updated_count: int = 0, requires_full_refresh: bool = False) -> None:
now = time.monotonic()
# Note: Live poller diagnostics track only lightweight speed/status refreshes, not the full torrent snapshot loop.
state.live_poll_count += 1
state.last_live_duration_ms = round((now - started_at) * 1000.0, 2)
state.last_live_updated_count = int(updated_count or 0)
state.live_updated_total += int(updated_count or 0)
state.last_live_requires_full_refresh = bool(requires_full_refresh)
if requires_full_refresh:
state.live_full_refresh_requested_total += 1
state.last_live_ok = bool(ok)
state.last_live_error = str(error or "")
def mark_list_poll(state: ProfilePollState, started_at: float, ok: bool, error: str = "", added_count: int = 0, updated_count: int = 0, removed_count: int = 0) -> None:
now = time.monotonic()
# Note: List poller diagnostics are separate because this slower loop runs full torrent snapshot reconciliation.
state.list_poll_count += 1
state.last_list_duration_ms = round((now - started_at) * 1000.0, 2)
state.last_list_added_count = int(added_count or 0)
state.last_list_updated_count = int(updated_count or 0)
state.last_list_removed_count = int(removed_count or 0)
state.list_added_total += int(added_count or 0)
state.list_updated_total += int(updated_count or 0)
state.list_removed_total += int(removed_count or 0)
state.last_list_ok = bool(ok)
state.last_list_error = str(error or "")
def reset_runtime_stats(profile_id: int) -> dict:
state = state_for(profile_id)
# Note: Cleanup resets diagnostic counters only; poller timers and saved settings keep running unchanged.
state.tick_count = 0
state.last_tick_ms = 0.0
state.last_tick_gap_ms = 0.0
state.last_tick_started_at = 0.0
state.error_count = 0
state.slow_count = 0
state.skipped_emissions = 0
state.emitted_payload_size = 0
state.rtorrent_call_count = 0
state.live_poll_count = 0
state.list_poll_count = 0
state.live_updated_total = 0
state.live_full_refresh_requested_total = 0
state.list_added_total = 0
state.list_updated_total = 0
state.list_removed_total = 0
state.last_live_duration_ms = 0.0
state.last_list_duration_ms = 0.0
state.last_live_updated_count = 0
state.last_list_added_count = 0
state.last_list_updated_count = 0
state.last_list_removed_count = 0
state.last_live_ok = True
state.last_list_ok = True
state.last_live_error = ""
state.last_list_error = ""
state.last_live_requires_full_refresh = False
state.stats = {}
return snapshot(profile_id)
def mark_tick(state: ProfilePollState, started_at: float, active: bool, ok: bool, error: str = "", emitted_payload_size: int = 0, rtorrent_call_count: int = 0, skipped_emissions: int = 0, settings: dict | None = None) -> dict: def mark_tick(state: ProfilePollState, started_at: float, active: bool, ok: bool, error: str = "", emitted_payload_size: int = 0, rtorrent_call_count: int = 0, skipped_emissions: int = 0, settings: dict | None = None) -> dict:
now = time.monotonic() now = time.monotonic()
effective_settings = normalize_settings(settings) if settings is not None else DEFAULTS effective_settings = normalize_settings(settings) if settings is not None else DEFAULTS
@@ -267,6 +348,24 @@ def mark_tick(state: ProfilePollState, started_at: float, active: bool, ok: bool
"adaptive_mode": state.adaptive_mode, "adaptive_mode": state.adaptive_mode,
"error_count": state.error_count, "error_count": state.error_count,
"slow_count": state.slow_count, "slow_count": state.slow_count,
"live_poll_count": state.live_poll_count,
"list_poll_count": state.list_poll_count,
"last_live_duration_ms": state.last_live_duration_ms,
"last_list_duration_ms": state.last_list_duration_ms,
"last_live_updated_count": state.last_live_updated_count,
"last_list_added_count": state.last_list_added_count,
"last_list_updated_count": state.last_list_updated_count,
"last_list_removed_count": state.last_list_removed_count,
"live_updated_total": state.live_updated_total,
"list_added_total": state.list_added_total,
"list_updated_total": state.list_updated_total,
"list_removed_total": state.list_removed_total,
"live_full_refresh_requested_total": state.live_full_refresh_requested_total,
"last_live_requires_full_refresh": state.last_live_requires_full_refresh,
"last_live_ok": state.last_live_ok,
"last_list_ok": state.last_list_ok,
"last_live_error": state.last_live_error,
"last_list_error": state.last_list_error,
"updated_at": utcnow(), "updated_at": utcnow(),
} }
return dict(state.stats) return dict(state.stats)
@@ -274,4 +373,26 @@ def mark_tick(state: ProfilePollState, started_at: float, active: bool, ok: bool
def snapshot(profile_id: int) -> dict: def snapshot(profile_id: int) -> dict:
state = state_for(profile_id) state = state_for(profile_id)
return dict(state.stats or {"profile_id": int(profile_id), "tick_count": state.tick_count}) data = dict(state.stats or {"profile_id": int(profile_id), "tick_count": state.tick_count})
# Note: Snapshot always exposes split-poller counters, even before the first post-cleanup tick rebuilds full stats.
data.update({
"live_poll_count": state.live_poll_count,
"list_poll_count": state.list_poll_count,
"last_live_duration_ms": state.last_live_duration_ms,
"last_list_duration_ms": state.last_list_duration_ms,
"last_live_updated_count": state.last_live_updated_count,
"last_list_added_count": state.last_list_added_count,
"last_list_updated_count": state.last_list_updated_count,
"last_list_removed_count": state.last_list_removed_count,
"live_updated_total": state.live_updated_total,
"list_added_total": state.list_added_total,
"list_updated_total": state.list_updated_total,
"list_removed_total": state.list_removed_total,
"live_full_refresh_requested_total": state.live_full_refresh_requested_total,
"last_live_requires_full_refresh": state.last_live_requires_full_refresh,
"last_live_ok": state.last_live_ok,
"last_list_ok": state.last_list_ok,
"last_live_error": state.last_live_error,
"last_list_error": state.last_list_error,
})
return data

View File

@@ -148,12 +148,14 @@ def register_socketio_handlers(socketio):
speed_status = _speed_status_from_rows(pid, rows) speed_status = _speed_status_from_rows(pid, rows)
if run_live: if run_live:
live_started = time.monotonic()
live = torrent_cache.refresh_live(profile) live = torrent_cache.refresh_live(profile)
rtorrent_call_count += 1 rtorrent_call_count += 1
state.last_live_at = now state.last_live_at = now
state.last_fast_at = now state.last_fast_at = now
ok = bool(live.get("ok")) ok = bool(live.get("ok"))
error = str(live.get("error") or "") error = str(live.get("error") or "")
poller_control.mark_live_poll(state, live_started, ok, error, len(live.get("updated") or []), bool(live.get("requires_full_refresh")))
rows = torrent_cache.snapshot(pid) rows = torrent_cache.snapshot(pid)
active = _is_active_rows(rows) active = _is_active_rows(rows)
speed_status = _speed_status_from_rows(pid, rows) if live.get("ok") else speed_status speed_status = _speed_status_from_rows(pid, rows) if live.get("ok") else speed_status
@@ -179,11 +181,13 @@ def register_socketio_handlers(socketio):
_emit_profile(socketio, "rtorrent_error", live, pid) _emit_profile(socketio, "rtorrent_error", live, pid)
if run_list: if run_list:
list_started = time.monotonic()
diff = torrent_cache.refresh(profile) diff = torrent_cache.refresh(profile)
rtorrent_call_count += 1 rtorrent_call_count += 1
state.last_list_at = now state.last_list_at = now
ok = bool(diff.get("ok")) ok = bool(diff.get("ok"))
error = str(diff.get("error") or "") error = str(diff.get("error") or "")
poller_control.mark_list_poll(state, list_started, ok, error, len(diff.get("added") or []), len(diff.get("updated") or []), len(diff.get("removed") or []))
rows = torrent_cache.snapshot(pid) rows = torrent_cache.snapshot(pid)
active = _is_active_rows(rows) active = _is_active_rows(rows)
speed_status = _speed_status_from_rows(pid, rows) if diff.get("ok") else speed_status speed_status = _speed_status_from_rows(pid, rows) if diff.get("ok") else speed_status

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long