From fc76ca19a1c76b2ca1f19b364c6f2e42508865e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Sun, 14 Jun 2026 13:57:48 +0200 Subject: [PATCH] background_workers --- pytorrent/__init__.py | 4 + pytorrent/routes/_shared.py | 187 +--------------- pytorrent/routes/system.py | 5 + pytorrent/services/automation_rules.py | 113 ++++++---- pytorrent/services/background_automations.py | 146 ++++++++++++ pytorrent/services/background_cache_warmup.py | 210 ++++++++++++++++++ pytorrent/services/download_planner.py | 42 +++- pytorrent/services/port_check.py | 195 ++++++++++++++++ pytorrent/services/startup_config.py | 109 +++++++-- pytorrent/services/tracker_cache.py | 47 ++++ 10 files changed, 809 insertions(+), 249 deletions(-) create mode 100644 pytorrent/services/background_automations.py create mode 100644 pytorrent/services/background_cache_warmup.py create mode 100644 pytorrent/services/port_check.py diff --git a/pytorrent/__init__.py b/pytorrent/__init__.py index e7f8243..5f423b9 100644 --- a/pytorrent/__init__.py +++ b/pytorrent/__init__.py @@ -143,6 +143,8 @@ def create_app() -> Flask: register_socketio_handlers(socketio) from .services.startup_config import schedule_startup_config_apply schedule_startup_config_apply(socketio) + from .services.background_automations import start_scheduler as start_background_automation_scheduler + start_background_automation_scheduler(socketio) from .services.rss import start_scheduler as start_rss_scheduler from .services.ratio_rules import start_scheduler as start_ratio_scheduler from .services.download_planner import start_scheduler as start_download_planner_scheduler @@ -151,4 +153,6 @@ def create_app() -> Flask: start_ratio_scheduler(socketio) start_download_planner_scheduler(socketio) start_backup_scheduler() + from .services.background_cache_warmup import start_scheduler as start_cache_warmup_scheduler + start_cache_warmup_scheduler(socketio) return app diff --git a/pytorrent/routes/_shared.py b/pytorrent/routes/_shared.py index bfbe712..6116b4f 100644 --- a/pytorrent/routes/_shared.py +++ b/pytorrent/routes/_shared.py @@ -52,192 +52,7 @@ def ok(payload=None): -PORT_CHECK_CACHE_SECONDS = 6 * 60 * 60 - - -def _app_setting_get(key: str): - with connect() as conn: - row = conn.execute("SELECT value FROM app_settings WHERE key=?", (key,)).fetchone() - return row.get("value") if row else None - - -def _app_setting_set(key: str, value: str): - with connect() as conn: - conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (key, value)) - - -def _iso_from_epoch(value) -> str | None: - try: - return datetime.fromtimestamp(float(value), timezone.utc).isoformat(timespec="seconds") - except Exception: - return None - - -def _public_ip(profile: dict | None = None, force: bool = False) -> str: - if profile and bool(profile.get("is_remote")): - return rtorrent.remote_public_ip(profile, force=force) - req = urllib.request.Request("https://api.ipify.org", headers={"User-Agent": "pyTorrent/port-check"}) - with urllib.request.urlopen(req, timeout=8) as res: - return res.read(64).decode("utf-8", "replace").strip() - - -MAX_PORT_CHECK_CANDIDATES = 256 - - -def _parse_port_candidates(value: str, limit: int = MAX_PORT_CHECK_CANDIDATES) -> tuple[list[int], bool]: - """Return valid incoming port candidates from rTorrent network.port_range. - - Note: rTorrent may keep a range/list and pick a random port on start. - The old checker used only the first number, which produced false "closed" - results when another configured port was actually active. - """ - ports: list[int] = [] - seen: set[int] = set() - truncated = False - - def add(port: int) -> None: - nonlocal truncated - if not 1 <= port <= 65535 or port in seen: - return - if len(ports) >= limit: - truncated = True - return - seen.add(port) - ports.append(port) - - for start, end in re.findall(r"(\d{1,5})\s*-\s*(\d{1,5})", value or ""): - a, b = int(start), int(end) - if a > b: - a, b = b, a - for port in range(a, b + 1): - add(port) - if truncated: - break - - without_ranges = re.sub(r"\d{1,5}\s*-\s*\d{1,5}", " ", value or "") - for item in re.findall(r"\d{1,5}", without_ranges): - add(int(item)) - - return ports, truncated - - -def _incoming_ports(profile: dict) -> dict: - try: - raw_value = str(rtorrent.client_for(profile).call("network.port_range") or "") - except Exception: - raw_value = "" - ports, truncated = _parse_port_candidates(raw_value) - return {"ports": ports, "raw": raw_value, "truncated": truncated} - - -def _yougetsignal_check(public_ip: str, port: int) -> dict: - body = urllib.parse.urlencode({"remoteAddress": public_ip, "portNumber": str(port)}).encode("utf-8") - req = urllib.request.Request( - "https://ports.yougetsignal.com/check-port.php", - data=body, - headers={ - "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", - "User-Agent": "pyTorrent/port-check", - "Accept": "text/html,application/json,*/*", - }, - method="POST", - ) - with urllib.request.urlopen(req, timeout=12) as res: - text = res.read(8192).decode("utf-8", "replace") - low = text.lower() - if "is open" in low: - return {"status": "open", "source": "yougetsignal", "raw": text[:500]} - if "is closed" in low: - return {"status": "closed", "source": "yougetsignal", "raw": text[:500]} - return {"status": "unknown", "source": "yougetsignal", "raw": text[:500]} - - -def _local_port_fallback(public_ip: str, port: int) -> dict: - try: - with socket.create_connection((public_ip, port), timeout=3): - return {"status": "open", "source": "local-fallback"} - except Exception as exc: - return {"status": "unknown", "source": "local-fallback", "error": f"Local fallback inconclusive: {exc}"} - - -def _check_ports(public_ip: str, ports: list[int], checker) -> dict: - checked: list[int] = [] - first_closed: dict | None = None - last_result: dict = {"status": "unknown"} - - for port in ports: - checked.append(port) - current = checker(public_ip, port) - last_result = current - if current.get("status") == "open": - current.update({"port": port, "open_port": port, "checked_ports": checked}) - return current - if current.get("status") == "closed" and first_closed is None: - first_closed = current - - result = first_closed or last_result - result.update({"port": ports[0] if ports else None, "open_port": None, "checked_ports": checked}) - return result - - -def port_check_status(force: bool = False) -> dict: - profile = preferences.active_profile() - prefs = preferences.get_preferences() - enabled = bool((prefs or {}).get("port_check_enabled")) - if not profile: - return {"status": "unknown", "enabled": enabled, "error": "No profile"} - - port_info = _incoming_ports(profile) - ports = port_info["ports"] - if not ports: - return {"status": "unknown", "enabled": enabled, "error": "Cannot read rTorrent network.port_range"} - - ports_key = ",".join(str(port) for port in ports) - cache_key = f"port_check:{profile['id']}:{ports_key}:{int(bool(port_info['truncated']))}" - if not force: - cached = _app_setting_get(cache_key) - if cached: - try: - data = json.loads(cached) - if time.time() - float(data.get("checked_at_epoch") or 0) < PORT_CHECK_CACHE_SECONDS: - data["cached"] = True - data["enabled"] = enabled - if not data.get("checked_at"): - data["checked_at"] = _iso_from_epoch(data.get("checked_at_epoch")) - return data - except Exception: - pass - - checked_at_epoch = time.time() - result = { - "status": "unknown", - "enabled": enabled, - "port": ports[0], - "ports": ports, - "port_range": port_info["raw"], - "ports_truncated": port_info["truncated"], - "checked_at_epoch": checked_at_epoch, - "checked_at": _iso_from_epoch(checked_at_epoch), - "cached": False, - } - try: - public_ip = _public_ip(profile, force=force) - result["public_ip"] = public_ip - result["remote"] = bool(profile.get("is_remote")) - result.update(_check_ports(public_ip, ports, _yougetsignal_check)) - except Exception as exc: - result["error"] = f"YouGetSignal failed: {exc}" - try: - public_ip = result.get("public_ip") or _public_ip(profile, force=force) - result["public_ip"] = public_ip - result["remote"] = bool(profile.get("is_remote")) - result.update(_check_ports(public_ip, ports, _local_port_fallback)) - except Exception as fallback_exc: - result["fallback_error"] = str(fallback_exc) - result["source"] = "none" - _app_setting_set(cache_key, json.dumps(result)) - return result - +from ..services.port_check import port_check_status diff --git a/pytorrent/routes/system.py b/pytorrent/routes/system.py index 0d47a48..99ab086 100644 --- a/pytorrent/routes/system.py +++ b/pytorrent/routes/system.py @@ -125,6 +125,11 @@ def app_status(): status["port_check"] = {"status": "disabled", "enabled": False} if not bool((prefs or {}).get("port_check_enabled")) else port_check_status(force=False) except Exception as exc: status["port_check"] = {"status": "error", "error": str(exc)} + try: + from ..services import background_cache_warmup + status["background_cache_warmup"] = background_cache_warmup.status() + except Exception as exc: + status["background_cache_warmup"] = {"started": False, "error": str(exc)} status["api_ms"] = round((time.perf_counter() - started) * 1000, 2) return ok({"status": status}) diff --git a/pytorrent/services/automation_rules.py b/pytorrent/services/automation_rules.py index 2a629ae..ed042ee 100644 --- a/pytorrent/services/automation_rules.py +++ b/pytorrent/services/automation_rules.py @@ -2,6 +2,7 @@ from __future__ import annotations from datetime import datetime, timezone from typing import Any import json +import threading from ..db import connect, default_user_id, utcnow from . import rtorrent, auth from .preferences import active_profile @@ -9,6 +10,19 @@ from .workers import enqueue AUTOMATION_JOB_CHUNK_SIZE = 100 AUTOMATION_LIGHT_ACTIONS = {'start', 'stop', 'pause', 'resume', 'set_label'} +_CHECK_LOCKS: dict[tuple[int, int | None], threading.Lock] = {} +_CHECK_LOCKS_GUARD = threading.Lock() + + +def _check_lock(profile_id: int, rule_id: int | None = None) -> threading.Lock: + """Prevent overlapping automation runs for the same profile or rule.""" + key = (int(profile_id), int(rule_id) if rule_id is not None else None) + with _CHECK_LOCKS_GUARD: + if key not in _CHECK_LOCKS: + _CHECK_LOCKS[key] = threading.Lock() + return _CHECK_LOCKS[key] + + def _resolve_user_id(profile: dict[str, Any] | None = None, user_id: int | None = None) -> int: @@ -457,50 +471,57 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = profile_id = int(profile['id']) if rule_id is not None: _require_profile_read(profile_id, user_id) - rules = _list_enabled_rules_for_profile(profile_id, rule_id=rule_id, force=force) - if not rules: - return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0} - torrents = rtorrent.list_torrents(profile) - applied = [] - batches = [] - now = utcnow() - planned: list[dict[str, Any]] = [] - with connect() as conn: - for rule in rules: - if not force and not _cooldown_ok(conn, rule, profile_id): - continue - matched = [t for t in torrents if _conditions_match(conn, rule, profile_id, t)] - if not matched: - continue - hashes = [str(t.get('hash') or '') for t in matched if str(t.get('hash') or '')] - if hashes: - planned.append({'rule': rule, 'matched': matched, 'hashes': hashes}) - for item in planned: - rule = item['rule'] - matched = item['matched'] - hashes = item['hashes'] - owner_id = int(rule.get('user_id') or rule.get('owner_user_id') or default_user_id()) - if not auth.can_write_profile(profile_id, owner_id): - batch = _record_skipped_rule(profile_id, rule, hashes, 'Rule owner no longer has write access to profile', now) - batches.append(batch) - continue - try: - actions = _apply_effects_bulk(None, profile, matched, rule.get('effects') or [], rule, owner_id) - except Exception as exc: - actions = [{'error': str(exc), 'count': len(hashes), 'target_hashes': hashes}] - changed_hashes = sorted({h for a in actions for h in (a.get('target_hashes') or [])}) - if not actions or not changed_hashes: - continue - history_actions = [{k: v for k, v in a.items() if k != 'target_hashes'} for a in actions] - matched_by_hash = {str(t.get('hash') or ''): t for t in matched} + lock = _check_lock(profile_id, rule_id) + if not lock.acquire(blocking=False): + # Note: Browser, manual and background checks can now coexist without duplicate rule application. + return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0, 'skipped': True, 'reason': 'Automation check already running'} + try: + rules = _list_enabled_rules_for_profile(profile_id, rule_id=rule_id, force=force) + if not rules: + return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0} + torrents = rtorrent.list_torrents(profile) + applied = [] + batches = [] + now = utcnow() + planned: list[dict[str, Any]] = [] with connect() as conn: - for h in changed_hashes: - t = matched_by_hash.get(h, {}) - conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, now, now, now)) - applied.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'hash': h, 'name': t.get('name'), 'actions': [{'type': a.get('type', 'error'), 'count': a.get('count', len(changed_hashes))} for a in actions]}) - _mark_rule_cooldown(conn, rule, profile_id, now) - torrent_name = str(matched_by_hash.get(changed_hashes[0], {}).get('name') or '') if len(changed_hashes) == 1 else f'{len(changed_hashes)} torrents' - torrent_hash = changed_hashes[0] if len(changed_hashes) == 1 else f'batch:{rule["id"]}:{now}' - conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (owner_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_actions), now)) - batches.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'count': len(changed_hashes), 'actions': history_actions}) - return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied, 'batches': batches} + for rule in rules: + if not force and not _cooldown_ok(conn, rule, profile_id): + continue + matched = [t for t in torrents if _conditions_match(conn, rule, profile_id, t)] + if not matched: + continue + hashes = [str(t.get('hash') or '') for t in matched if str(t.get('hash') or '')] + if hashes: + planned.append({'rule': rule, 'matched': matched, 'hashes': hashes}) + for item in planned: + rule = item['rule'] + matched = item['matched'] + hashes = item['hashes'] + owner_id = int(rule.get('user_id') or rule.get('owner_user_id') or default_user_id()) + if not auth.can_write_profile(profile_id, owner_id): + batch = _record_skipped_rule(profile_id, rule, hashes, 'Rule owner no longer has write access to profile', now) + batches.append(batch) + continue + try: + actions = _apply_effects_bulk(None, profile, matched, rule.get('effects') or [], rule, owner_id) + except Exception as exc: + actions = [{'error': str(exc), 'count': len(hashes), 'target_hashes': hashes}] + changed_hashes = sorted({h for a in actions for h in (a.get('target_hashes') or [])}) + if not actions or not changed_hashes: + continue + history_actions = [{k: v for k, v in a.items() if k != 'target_hashes'} for a in actions] + matched_by_hash = {str(t.get('hash') or ''): t for t in matched} + with connect() as conn: + for h in changed_hashes: + t = matched_by_hash.get(h, {}) + conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, now, now, now)) + applied.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'hash': h, 'name': t.get('name'), 'actions': [{'type': a.get('type', 'error'), 'count': a.get('count', len(changed_hashes))} for a in actions]}) + _mark_rule_cooldown(conn, rule, profile_id, now) + torrent_name = str(matched_by_hash.get(changed_hashes[0], {}).get('name') or '') if len(changed_hashes) == 1 else f'{len(changed_hashes)} torrents' + torrent_hash = changed_hashes[0] if len(changed_hashes) == 1 else f'batch:{rule["id"]}:{now}' + conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (owner_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_actions), now)) + batches.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'count': len(changed_hashes), 'actions': history_actions}) + return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied, 'batches': batches} + finally: + lock.release() diff --git a/pytorrent/services/background_automations.py b/pytorrent/services/background_automations.py new file mode 100644 index 0000000..03f8abc --- /dev/null +++ b/pytorrent/services/background_automations.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +import os +import threading +import time +from typing import Any + +from ..db import connect, default_user_id +from . import automation_rules, operation_logs, poller_control, rtorrent +from .websocket import emit_profile_event + +_started = False +_start_lock = threading.Lock() +_profile_locks: dict[int, threading.Lock] = {} +_profile_locks_lock = threading.Lock() +_last_logged_status: dict[int, str] = {} + + +def _configured_interval() -> float: + """Return the minimum background automation interval from environment settings.""" + try: + return max(5.0, min(3600.0, float(os.environ.get("PYTORRENT_AUTOMATION_BACKGROUND_INTERVAL_SECONDS", "15")))) + except Exception: + return 15.0 + + +def _profiles() -> list[dict[str, Any]]: + """Read configured profiles without relying on a browser session.""" + with connect() as conn: + return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()] + + +def _profile_lock(profile_id: int) -> threading.Lock: + """Keep one automation pass per profile active at a time.""" + with _profile_locks_lock: + if profile_id not in _profile_locks: + _profile_locks[profile_id] = threading.Lock() + return _profile_locks[profile_id] + + +def _owner_user_id(profile: dict[str, Any]) -> int: + """Use the profile owner for background checks so rule permissions stay stable.""" + return int(profile.get("user_id") or default_user_id()) + + +def _profile_interval(profile_id: int) -> float: + """Reuse the existing queue poller cadence instead of adding another UI setting.""" + settings = poller_control.get_settings(profile_id) + return max(_configured_interval(), float(settings.get("queue_stats_interval_seconds") or 15.0)) + + +def _connected(profile: dict[str, Any]) -> tuple[bool, str]: + """Verify rTorrent connectivity before running automation logic.""" + try: + rtorrent.client_for(profile).call("system.client_version") + return True, "" + except Exception as exc: + return False, str(exc) + + +def _log_status(profile_id: int, status: str, message: str, *, error: str = "") -> None: + """Log only connectivity state changes to avoid noisy system logs.""" + if _last_logged_status.get(profile_id) == status: + return + _last_logged_status[profile_id] = status + severity = "warning" if error else "info" + operation_logs.record( + profile_id, + "background_automation_status", + message, + severity=severity, + source="system", + action="background_automation", + details={"status": status, "error": error}, + ) + + +def _run_profile(socketio, profile: dict[str, Any]) -> None: + """Run one safe background automation pass for a connected profile.""" + profile_id = int(profile.get("id") or 0) + if not profile_id: + return + lock = _profile_lock(profile_id) + if not lock.acquire(blocking=False): + return + try: + ok, error = _connected(profile) + if not ok: + _log_status(profile_id, "disconnected", f"Background automations waiting for rTorrent: {error}", error=error) + return + _log_status(profile_id, "connected", "Background automations detected a working rTorrent connection") + result = automation_rules.check(profile, user_id=_owner_user_id(profile), force=False) + if result.get("applied") or result.get("batches"): + operation_logs.record( + profile_id, + "background_automation_run", + "Background automations applied matching rules", + source="system", + action="background_automation", + details={"applied": len(result.get("applied") or []), "batches": len(result.get("batches") or []), "result": result}, + user_id=_owner_user_id(profile), + ) + emit_profile_event(socketio, "automation_update", result, profile_id) + except Exception as exc: + operation_logs.record( + profile_id, + "background_automation_error", + f"Background automation check failed: {exc}", + severity="warning", + source="system", + action="background_automation", + details={"error": str(exc)}, + user_id=_owner_user_id(profile), + ) + finally: + lock.release() + + +def start_scheduler(socketio) -> None: + """Start browser-independent automation checks once per application process.""" + global _started + with _start_lock: + if _started: + return + _started = True + + def runner() -> None: + last_run: dict[int, float] = {} + while True: + started = time.monotonic() + next_sleep = _configured_interval() + for profile in _profiles(): + profile_id = int(profile.get("id") or 0) + if not profile_id: + continue + interval = _profile_interval(profile_id) + elapsed = started - float(last_run.get(profile_id) or 0.0) + if elapsed < interval: + next_sleep = min(next_sleep, max(1.0, interval - elapsed)) + continue + last_run[profile_id] = started + _run_profile(socketio, profile) + next_sleep = min(next_sleep, interval) + socketio.sleep(max(1.0, next_sleep)) + + socketio.start_background_task(runner) diff --git a/pytorrent/services/background_cache_warmup.py b/pytorrent/services/background_cache_warmup.py new file mode 100644 index 0000000..1b002bf --- /dev/null +++ b/pytorrent/services/background_cache_warmup.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import os +import threading +import time +from typing import Any + +from ..db import connect, default_user_id +from . import port_check, preferences, rtorrent, tracker_cache +from .torrent_cache import torrent_cache + +STARTUP_DELAY_SECONDS = 60 +DEFAULT_TRACKER_INTERVAL_SECONDS = 15 * 60 +DEFAULT_PORT_INTERVAL_SECONDS = port_check.PORT_CHECK_CACHE_SECONDS +FAVICON_BATCH_SIZE = 20 + +_started = False +_start_lock = threading.Lock() +_status_lock = threading.Lock() +_status: dict[str, Any] = { + "started": False, + "tracker_warmup": {}, + "port_check": {}, +} + + +def _setting_float(name: str, default: float, minimum: float, maximum: float) -> float: + """Read a bounded worker interval from the environment.""" + # Note: Defaults keep the worker light while still making UI-independent caches fresh after startup. + try: + value = float(os.environ.get(name, str(default))) + except Exception: + value = default + return max(minimum, min(maximum, value)) + + +def _profiles() -> list[dict[str, Any]]: + """Read every rTorrent profile directly from the database.""" + # Note: The worker cannot rely on active browser session state, so it iterates real configured profiles. + with connect() as conn: + return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()] + + +def _owner_user_id(profile: dict[str, Any]) -> int: + """Return the profile owner used for profile-scoped preferences.""" + return int(profile.get("user_id") or default_user_id()) + + +def _connected(profile: dict[str, Any]) -> tuple[bool, str]: + """Check rTorrent connectivity without changing user state.""" + try: + rtorrent.client_for(profile).call("system.client_version") + return True, "" + except Exception as exc: + return False, str(exc) + + +def _remember(section: str, profile_id: int, payload: dict[str, Any]) -> None: + """Store lightweight in-memory diagnostics for app/status.""" + # Note: Cache warmups are not user operations, so they stay out of operation logs by default. + with _status_lock: + data = dict(_status.get(section) or {}) + data[str(profile_id)] = {**payload, "updated_at_epoch": time.time()} + _status[section] = data + + +def status() -> dict[str, Any]: + """Return current worker diagnostics for system status endpoints.""" + with _status_lock: + return { + "started": bool(_status.get("started")), + "startup_delay_seconds": STARTUP_DELAY_SECONDS, + "tracker_warmup": dict(_status.get("tracker_warmup") or {}), + "port_check": dict(_status.get("port_check") or {}), + } + + +def _tracker_domains_from_rows(rows: list[dict[str, Any]], summary: dict[str, Any], profile_id: int) -> list[str]: + """Build a bounded tracker domain list from fresh summary data and cached rows.""" + domains = [str(item.get("domain") or "") for item in summary.get("trackers") or []] + if not domains: + domains = tracker_cache.cached_domains_for_profile(profile_id, limit=200) + return domains + + +def _warm_tracker_profile(profile: dict[str, Any]) -> None: + """Warm tracker summary cache and optional favicon cache for one profile.""" + # Note: This mirrors the sidebar warmup, but runs from the backend scheduler instead of waiting for the filter panel. + profile_id = int(profile.get("id") or 0) + if not profile_id: + return + ok, error = _connected(profile) + if not ok: + _remember("tracker_warmup", profile_id, {"ok": False, "skipped": True, "reason": "rtorrent_disconnected", "error": error}) + return + + owner_id = _owner_user_id(profile) + prefs = preferences.get_preferences(owner_id, profile_id) + rows = torrent_cache.snapshot(profile_id) + if not rows: + torrent_cache.refresh(profile) + rows = torrent_cache.snapshot(profile_id) + hashes = [str(row.get("hash") or "") for row in rows if row.get("hash")] + if not hashes: + _remember("tracker_warmup", profile_id, {"ok": True, "skipped": True, "reason": "no_torrents"}) + return + + loader = lambda h: rtorrent.torrent_trackers(profile, h) + summary = tracker_cache.summary(profile, hashes, loader, scan_limit=tracker_cache.TRACKER_SCAN_LIMIT, include_favicons=False) + warming = False + if int(summary.get("pending") or 0) > 0: + warming = tracker_cache.warm_summary_cache(profile, hashes, loader, batch_size=tracker_cache.TRACKER_SCAN_LIMIT) + + favicon_result = {"checked": 0, "cached": 0, "errors": []} + if bool((prefs or {}).get("tracker_favicons_enabled")): + domains = _tracker_domains_from_rows(rows, summary, profile_id) + favicon_result = tracker_cache.warm_favicon_cache(domains, enabled=True, limit=FAVICON_BATCH_SIZE, force=False) + + _remember( + "tracker_warmup", + profile_id, + { + "ok": True, + "hashes": len(hashes), + "pending": int(summary.get("pending") or 0), + "scanned_now": int(summary.get("scanned_now") or 0), + "warming": bool(warming), + "favicons_enabled": bool((prefs or {}).get("tracker_favicons_enabled")), + "favicons": favicon_result, + }, + ) + + +def _check_port_profile(profile: dict[str, Any]) -> None: + """Refresh incoming-port status when the profile preference enables it.""" + # Note: force=False respects the existing six-hour cache and avoids unnecessary external checks. + profile_id = int(profile.get("id") or 0) + if not profile_id: + return + owner_id = _owner_user_id(profile) + prefs = preferences.get_preferences(owner_id, profile_id) + if not bool((prefs or {}).get("port_check_enabled")): + _remember("port_check", profile_id, {"ok": True, "enabled": False, "skipped": True, "reason": "disabled"}) + return + result = port_check.port_check_status(profile=profile, force=False, user_id=owner_id) + _remember( + "port_check", + profile_id, + { + "ok": not bool(result.get("error") and result.get("source") == "none"), + "enabled": True, + "status": result.get("status"), + "cached": bool(result.get("cached")), + "checked_at": result.get("checked_at"), + "error": result.get("error") or result.get("fallback_error") or "", + }, + ) + + +def start_scheduler(socketio=None) -> None: + """Start browser-independent cache warmup and port-check scheduler.""" + global _started + with _start_lock: + if _started: + return + _started = True + with _status_lock: + _status["started"] = True + + tracker_interval = _setting_float("PYTORRENT_CACHE_WARMUP_INTERVAL_SECONDS", DEFAULT_TRACKER_INTERVAL_SECONDS, 60.0, 24 * 60 * 60.0) + port_interval = _setting_float("PYTORRENT_PORT_CHECK_INTERVAL_SECONDS", DEFAULT_PORT_INTERVAL_SECONDS, 60.0, 24 * 60 * 60.0) + + def runner() -> None: + time.sleep(STARTUP_DELAY_SECONDS) + last_tracker: dict[int, float] = {} + last_port: dict[int, float] = {} + while True: + now = time.monotonic() + next_sleep = 60.0 + for profile in _profiles(): + profile_id = int(profile.get("id") or 0) + if not profile_id: + continue + if now - float(last_tracker.get(profile_id) or 0.0) >= tracker_interval: + last_tracker[profile_id] = now + try: + _warm_tracker_profile(profile) + except Exception as exc: + _remember("tracker_warmup", profile_id, {"ok": False, "error": str(exc)}) + if now - float(last_port.get(profile_id) or 0.0) >= port_interval: + last_port[profile_id] = now + try: + _check_port_profile(profile) + except Exception as exc: + _remember("port_check", profile_id, {"ok": False, "error": str(exc)}) + next_sleep = min( + next_sleep, + max(1.0, tracker_interval - (time.monotonic() - float(last_tracker.get(profile_id) or 0.0))), + max(1.0, port_interval - (time.monotonic() - float(last_port.get(profile_id) or 0.0))), + ) + sleep_for = max(5.0, min(60.0, next_sleep)) + if socketio: + socketio.sleep(sleep_for) + else: + time.sleep(sleep_for) + + if socketio: + socketio.start_background_task(runner) + else: + threading.Thread(target=runner, daemon=True, name="pytorrent-cache-warmup-scheduler").start() diff --git a/pytorrent/services/download_planner.py b/pytorrent/services/download_planner.py index b4adb50..297ac94 100644 --- a/pytorrent/services/download_planner.py +++ b/pytorrent/services/download_planner.py @@ -8,7 +8,10 @@ from typing import Any import psutil from ..db import connect, default_user_id, utcnow -from . import auth, rtorrent +from . import auth, operation_logs, rtorrent + +PLANNER_STARTUP_DELAY_SECONDS = 60 +_APP_STARTED_AT = time.monotonic() DEFAULTS = { "enabled": False, @@ -45,6 +48,34 @@ DEFAULTS = { _LAST_RUN: dict[int, float] = {} _LAST_LIMITS: dict[int, tuple[int, int]] = {} _HIGH_CPU_SINCE: dict[int, float] = {} +_PLANNER_CONNECTION_STATUS: dict[int, str] = {} + + +def _rtorrent_ready(profile: dict) -> tuple[bool, str]: + """Check rTorrent connectivity before the planner evaluates or applies changes.""" + try: + rtorrent.client_for(profile).call("system.client_version") + return True, "" + except Exception as exc: + return False, str(exc) + + +def _log_connection_status(profile: dict, status: str, message: str, *, error: str = "", user_id: int | None = None) -> None: + """Record planner connectivity state changes as system operations without noisy repeats.""" + profile_id = int(profile.get("id") or 0) + if _PLANNER_CONNECTION_STATUS.get(profile_id) == status: + return + _PLANNER_CONNECTION_STATUS[profile_id] = status + operation_logs.record( + profile_id, + "download_planner_status", + message, + severity="warning" if error else "info", + source="system", + action="download_planner", + details={"status": status, "error": error}, + user_id=user_id or int(profile.get("user_id") or 0) or None, + ) def _bool(value: Any) -> bool: @@ -471,11 +502,20 @@ def enforce(profile: dict, force: bool = False, user_id: int | None = None) -> d return {"ok": True, "enabled": False, "profile_id": profile_id, "skipped": True, "reason": "planner owner has no write access", "history": history(profile_id, 20), "history_total": history_count(profile_id)} if not settings.get("enabled"): return {"ok": True, "enabled": False, "profile_id": profile_id, "history": history(profile_id, 20), "history_total": history_count(profile_id), "preview": preview(profile, user_id=user_id)} + startup_remaining = int(PLANNER_STARTUP_DELAY_SECONDS - (time.monotonic() - _APP_STARTED_AT)) + if not force and startup_remaining > 0: + # Note: The background planner keeps the same startup grace as rTorrent config apply, while manual checks still run immediately. + return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True, "reason": "startup_delay", "retry_after_seconds": startup_remaining} now = time.monotonic() interval = int(settings.get("check_interval_seconds") or 30) if not force and now - _LAST_RUN.get(profile_id, 0) < interval: return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True} _LAST_RUN[profile_id] = now + ready, connection_error = _rtorrent_ready(profile) + if not ready: + _log_connection_status(profile, "waiting", f"Download Planner is waiting for rTorrent: {connection_error}", error=connection_error, user_id=user_id) + return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True, "reason": "rtorrent_unavailable", "error": connection_error, "retry_after_seconds": interval} + _log_connection_status(profile, "connected", "Download Planner detected a working rTorrent connection", user_id=user_id) decision = evaluate(profile, settings) result: dict[str, Any] = {"ok": True, "enabled": True, **decision, "limits_changed": False, "paused": 0, "resumed": 0} wanted_limits = (int(decision["down"]), int(decision["up"])) diff --git a/pytorrent/services/port_check.py b/pytorrent/services/port_check.py new file mode 100644 index 0000000..9b334c1 --- /dev/null +++ b/pytorrent/services/port_check.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +import json +import re +import socket +import time +import urllib.parse +import urllib.request +from datetime import datetime, timezone +from typing import Any + +from ..db import connect +from . import preferences, rtorrent + +PORT_CHECK_CACHE_SECONDS = 6 * 60 * 60 +MAX_PORT_CHECK_CANDIDATES = 256 + + +def _app_setting_get(key: str) -> str | None: + with connect() as conn: + row = conn.execute("SELECT value FROM app_settings WHERE key=?", (key,)).fetchone() + return row.get("value") if row else None + + +def _app_setting_set(key: str, value: str) -> None: + with connect() as conn: + conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (key, value)) + + +def _iso_from_epoch(value: Any) -> str | None: + try: + return datetime.fromtimestamp(float(value), timezone.utc).isoformat(timespec="seconds") + except Exception: + return None + + +def _public_ip(profile: dict | None = None, force: bool = False) -> str: + if profile and bool(profile.get("is_remote")): + return rtorrent.remote_public_ip(profile, force=force) + req = urllib.request.Request("https://api.ipify.org", headers={"User-Agent": "pyTorrent/port-check"}) + with urllib.request.urlopen(req, timeout=8) as res: + return res.read(64).decode("utf-8", "replace").strip() + + +def _parse_port_candidates(value: str, limit: int = MAX_PORT_CHECK_CANDIDATES) -> tuple[list[int], bool]: + """Return valid incoming port candidates from rTorrent network.port_range.""" + # Note: rTorrent can keep a range/list and pick a random port on start, so the checker tests all safe candidates. + ports: list[int] = [] + seen: set[int] = set() + truncated = False + + def add(port: int) -> None: + nonlocal truncated + if not 1 <= port <= 65535 or port in seen: + return + if len(ports) >= limit: + truncated = True + return + seen.add(port) + ports.append(port) + + for start, end in re.findall(r"(\d{1,5})\s*-\s*(\d{1,5})", value or ""): + a, b = int(start), int(end) + if a > b: + a, b = b, a + for port in range(a, b + 1): + add(port) + if truncated: + break + + without_ranges = re.sub(r"\d{1,5}\s*-\s*\d{1,5}", " ", value or "") + for item in re.findall(r"\d{1,5}", without_ranges): + add(int(item)) + + return ports, truncated + + +def _incoming_ports(profile: dict) -> dict: + try: + raw_value = str(rtorrent.client_for(profile).call("network.port_range") or "") + except Exception: + raw_value = "" + ports, truncated = _parse_port_candidates(raw_value) + return {"ports": ports, "raw": raw_value, "truncated": truncated} + + +def _yougetsignal_check(public_ip: str, port: int) -> dict: + body = urllib.parse.urlencode({"remoteAddress": public_ip, "portNumber": str(port)}).encode("utf-8") + req = urllib.request.Request( + "https://ports.yougetsignal.com/check-port.php", + data=body, + headers={ + "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", + "User-Agent": "pyTorrent/port-check", + "Accept": "text/html,application/json,*/*", + }, + method="POST", + ) + with urllib.request.urlopen(req, timeout=12) as res: + text = res.read(8192).decode("utf-8", "replace") + low = text.lower() + if "is open" in low: + return {"status": "open", "source": "yougetsignal", "raw": text[:500]} + if "is closed" in low: + return {"status": "closed", "source": "yougetsignal", "raw": text[:500]} + return {"status": "unknown", "source": "yougetsignal", "raw": text[:500]} + + +def _local_port_fallback(public_ip: str, port: int) -> dict: + try: + with socket.create_connection((public_ip, port), timeout=3): + return {"status": "open", "source": "local-fallback"} + except Exception as exc: + return {"status": "unknown", "source": "local-fallback", "error": f"Local fallback inconclusive: {exc}"} + + +def _check_ports(public_ip: str, ports: list[int], checker) -> dict: + checked: list[int] = [] + first_closed: dict | None = None + last_result: dict = {"status": "unknown"} + + for port in ports: + checked.append(port) + current = checker(public_ip, port) + last_result = current + if current.get("status") == "open": + current.update({"port": port, "open_port": port, "checked_ports": checked}) + return current + if current.get("status") == "closed" and first_closed is None: + first_closed = current + + result = first_closed or last_result + result.update({"port": ports[0] if ports else None, "open_port": None, "checked_ports": checked}) + return result + + +def port_check_status(profile: dict | None = None, force: bool = False, user_id: int | None = None) -> dict: + """Return cached or freshly checked incoming-port status for one rTorrent profile.""" + # Note: This service is shared by UI routes and the background worker, so browser startup is not required. + profile = profile or preferences.active_profile(user_id) + prefs = preferences.get_preferences(user_id, int(profile.get("id"))) if profile else preferences.get_preferences(user_id) + enabled = bool((prefs or {}).get("port_check_enabled")) + if not profile: + return {"status": "unknown", "enabled": enabled, "error": "No profile"} + + port_info = _incoming_ports(profile) + ports = port_info["ports"] + if not ports: + return {"status": "unknown", "enabled": enabled, "error": "Cannot read rTorrent network.port_range"} + + ports_key = ",".join(str(port) for port in ports) + cache_key = f"port_check:{profile['id']}:{ports_key}:{int(bool(port_info['truncated']))}" + if not force: + cached = _app_setting_get(cache_key) + if cached: + try: + data = json.loads(cached) + if time.time() - float(data.get("checked_at_epoch") or 0) < PORT_CHECK_CACHE_SECONDS: + data["cached"] = True + data["enabled"] = enabled + if not data.get("checked_at"): + data["checked_at"] = _iso_from_epoch(data.get("checked_at_epoch")) + return data + except Exception: + pass + + checked_at_epoch = time.time() + result = { + "status": "unknown", + "enabled": enabled, + "port": ports[0], + "ports": ports, + "port_range": port_info["raw"], + "ports_truncated": port_info["truncated"], + "checked_at_epoch": checked_at_epoch, + "checked_at": _iso_from_epoch(checked_at_epoch), + "cached": False, + } + try: + public_ip = _public_ip(profile, force=force) + result["public_ip"] = public_ip + result["remote"] = bool(profile.get("is_remote")) + result.update(_check_ports(public_ip, ports, _yougetsignal_check)) + except Exception as exc: + result["error"] = f"YouGetSignal failed: {exc}" + try: + public_ip = result.get("public_ip") or _public_ip(profile, force=force) + result["public_ip"] = public_ip + result["remote"] = bool(profile.get("is_remote")) + result.update(_check_ports(public_ip, ports, _local_port_fallback)) + except Exception as fallback_exc: + result["fallback_error"] = str(fallback_exc) + result["source"] = "none" + _app_setting_set(cache_key, json.dumps(result)) + return result diff --git a/pytorrent/services/startup_config.py b/pytorrent/services/startup_config.py index 9075026..a6cc305 100644 --- a/pytorrent/services/startup_config.py +++ b/pytorrent/services/startup_config.py @@ -1,26 +1,103 @@ from __future__ import annotations -from time import sleep -from . import preferences, rtorrent +import threading +from time import monotonic + +from ..db import connect +from . import operation_logs, rtorrent _started = False +_start_lock = threading.Lock() +_applied_profiles: set[int] = set() +_last_status: dict[int, str] = {} -def schedule_startup_config_apply(socketio, delay_seconds: int = 60) -> None: - """Apply saved rTorrent UI overrides after pyTorrent has been running for a moment.""" - global _started - if _started: +def _profiles() -> list[dict]: + """Read all configured profiles because startup work has no browser user session.""" + with connect() as conn: + return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()] + + +def _log_status(profile: dict, status: str, message: str, *, error: str = "", result: dict | None = None) -> None: + """Write meaningful startup config state changes as system operations.""" + profile_id = int(profile.get("id") or 0) + if status in {"waiting", "skipped"} and _last_status.get(profile_id) == status: return - _started = True + _last_status[profile_id] = status + operation_logs.record( + profile_id, + "rtorrent_config_startup", + message, + severity="warning" if error else "info", + source="system", + action="rtorrent_config", + details={"status": status, "error": error, "result": result or {}}, + user_id=int(profile.get("user_id") or 0) or None, + ) - def runner(): - sleep(max(0, int(delay_seconds))) - try: - for profile in preferences.list_profiles(): - result = rtorrent.apply_startup_overrides(profile) - if not result.get("skipped"): - socketio.emit("rtorrent_config_applied", {"profile_id": profile["id"], "result": result}) - except Exception as exc: - socketio.emit("rtorrent_config_applied", {"ok": False, "error": str(exc)}) + +def _rtorrent_ready(profile: dict) -> tuple[bool, str]: + """Check rTorrent before applying saved runtime overrides.""" + try: + rtorrent.client_for(profile).call("system.client_version") + return True, "" + except Exception as exc: + return False, str(exc) + + +def _apply_profile(socketio, profile: dict) -> None: + """Apply saved config only after the target rTorrent is reachable.""" + profile_id = int(profile.get("id") or 0) + if not profile_id or profile_id in _applied_profiles: + return + ok, error = _rtorrent_ready(profile) + if not ok: + _log_status(profile, "waiting", f"rTorrent config apply is waiting for connection: {error}", error=error) + return + result = rtorrent.apply_startup_overrides(profile) + if result.get("skipped"): + _applied_profiles.add(profile_id) + _log_status(profile, "skipped", "No saved rTorrent startup config overrides to apply", result=result) + return + _applied_profiles.add(profile_id) + _log_status(profile, "applied", "Saved rTorrent startup config overrides applied", result=result) + socketio.emit("rtorrent_config_applied", {"profile_id": profile_id, "result": result}) + + +def schedule_startup_config_apply(socketio, delay_seconds: int = 60, retry_seconds: int = 30, max_wait_seconds: int = 3600) -> None: + """Apply saved rTorrent UI overrides after the configured startup delay without requiring a browser.""" + global _started + with _start_lock: + if _started: + return + _started = True + + def runner() -> None: + socketio.sleep(max(0, int(delay_seconds))) + started_at = monotonic() + while True: + try: + profiles = _profiles() + for profile in profiles: + _apply_profile(socketio, profile) + pending = [int(profile.get("id") or 0) for profile in profiles if int(profile.get("id") or 0) not in _applied_profiles] + if not pending or monotonic() - started_at >= max(0, int(max_wait_seconds)): + for profile in profiles: + profile_id = int(profile.get("id") or 0) + if profile_id in pending: + _log_status(profile, "timeout", "rTorrent config startup apply stopped waiting for connection", error="startup wait timeout") + return + except Exception as exc: + operation_logs.record( + None, + "rtorrent_config_startup", + f"rTorrent startup config scheduler failed: {exc}", + severity="warning", + source="system", + action="rtorrent_config", + details={"error": str(exc)}, + ) + socketio.emit("rtorrent_config_applied", {"ok": False, "error": str(exc)}) + socketio.sleep(max(5, int(retry_seconds))) socketio.start_background_task(runner) diff --git a/pytorrent/services/tracker_cache.py b/pytorrent/services/tracker_cache.py index deaca74..8746756 100644 --- a/pytorrent/services/tracker_cache.py +++ b/pytorrent/services/tracker_cache.py @@ -438,3 +438,50 @@ def favicon_path(domain: str, enabled: bool = True, force: bool = False) -> tupl (clean, utcnow(), now, "; ".join(errors[-8:]) or "favicon not found"), ) return None, None + + +def cached_domains_for_profile(profile_id: int, limit: int = 200) -> list[str]: + """Return tracker domains already known for a profile from the summary cache.""" + # Note: The background favicon worker reads cached summary rows first, so it does not need the browser sidebar to discover domains. + domains: list[str] = [] + seen: set[str] = set() + with connect() as conn: + rows = conn.execute( + "SELECT trackers_json FROM tracker_summary_cache WHERE profile_id=? ORDER BY updated_epoch DESC LIMIT ?", + (int(profile_id), max(1, int(limit or 200))), + ).fetchall() + for row in rows: + try: + items = json.loads(row.get("trackers_json") or "[]") + except Exception: + items = [] + for item in items if isinstance(items, list) else []: + domain = tracker_domain(str((item or {}).get("url") or (item or {}).get("domain") or "")) or str((item or {}).get("domain") or "") + if domain and domain not in seen: + seen.add(domain) + domains.append(domain) + return domains[:max(1, int(limit or 200))] + + +def warm_favicon_cache(domains: list[str], enabled: bool = True, limit: int = 20, force: bool = False) -> dict: + """Warm missing or stale tracker favicons for a bounded list of domains.""" + # Note: Favicon lookup can perform network requests, so the caller must keep the batch size small. + clean_domains = [] + seen: set[str] = set() + for domain in domains or []: + clean = tracker_domain(domain) + if clean and clean not in seen: + seen.add(clean) + clean_domains.append(clean) + checked = 0 + cached = 0 + errors: list[dict] = [] + for domain in clean_domains[:max(0, int(limit or 0))]: + checked += 1 + try: + path, _mime = favicon_path(domain, enabled=enabled, force=force) + if path: + cached += 1 + except Exception as exc: + errors.append({"domain": domain, "error": str(exc)}) + return {"checked": checked, "cached": cached, "errors": errors[:10]}