Start worker #28

Merged
gru merged 2 commits from start_worker into master 2026-06-14 23:00:46 +02:00
15 changed files with 1148 additions and 280 deletions
+4
View File
@@ -143,6 +143,8 @@ def create_app() -> Flask:
register_socketio_handlers(socketio)
from .services.startup_config import schedule_startup_config_apply
schedule_startup_config_apply(socketio)
from .services.background_automations import start_scheduler as start_background_automation_scheduler
start_background_automation_scheduler(socketio)
from .services.rss import start_scheduler as start_rss_scheduler
from .services.ratio_rules import start_scheduler as start_ratio_scheduler
from .services.download_planner import start_scheduler as start_download_planner_scheduler
@@ -151,4 +153,6 @@ def create_app() -> Flask:
start_ratio_scheduler(socketio)
start_download_planner_scheduler(socketio)
start_backup_scheduler()
from .services.background_cache_warmup import start_scheduler as start_cache_warmup_scheduler
start_cache_warmup_scheduler(socketio)
return app
+1 -186
View File
@@ -52,192 +52,7 @@ def ok(payload=None):
PORT_CHECK_CACHE_SECONDS = 6 * 60 * 60
def _app_setting_get(key: str):
with connect() as conn:
row = conn.execute("SELECT value FROM app_settings WHERE key=?", (key,)).fetchone()
return row.get("value") if row else None
def _app_setting_set(key: str, value: str):
with connect() as conn:
conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (key, value))
def _iso_from_epoch(value) -> str | None:
try:
return datetime.fromtimestamp(float(value), timezone.utc).isoformat(timespec="seconds")
except Exception:
return None
def _public_ip(profile: dict | None = None, force: bool = False) -> str:
if profile and bool(profile.get("is_remote")):
return rtorrent.remote_public_ip(profile, force=force)
req = urllib.request.Request("https://api.ipify.org", headers={"User-Agent": "pyTorrent/port-check"})
with urllib.request.urlopen(req, timeout=8) as res:
return res.read(64).decode("utf-8", "replace").strip()
MAX_PORT_CHECK_CANDIDATES = 256
def _parse_port_candidates(value: str, limit: int = MAX_PORT_CHECK_CANDIDATES) -> tuple[list[int], bool]:
"""Return valid incoming port candidates from rTorrent network.port_range.
Note: rTorrent may keep a range/list and pick a random port on start.
The old checker used only the first number, which produced false "closed"
results when another configured port was actually active.
"""
ports: list[int] = []
seen: set[int] = set()
truncated = False
def add(port: int) -> None:
nonlocal truncated
if not 1 <= port <= 65535 or port in seen:
return
if len(ports) >= limit:
truncated = True
return
seen.add(port)
ports.append(port)
for start, end in re.findall(r"(\d{1,5})\s*-\s*(\d{1,5})", value or ""):
a, b = int(start), int(end)
if a > b:
a, b = b, a
for port in range(a, b + 1):
add(port)
if truncated:
break
without_ranges = re.sub(r"\d{1,5}\s*-\s*\d{1,5}", " ", value or "")
for item in re.findall(r"\d{1,5}", without_ranges):
add(int(item))
return ports, truncated
def _incoming_ports(profile: dict) -> dict:
try:
raw_value = str(rtorrent.client_for(profile).call("network.port_range") or "")
except Exception:
raw_value = ""
ports, truncated = _parse_port_candidates(raw_value)
return {"ports": ports, "raw": raw_value, "truncated": truncated}
def _yougetsignal_check(public_ip: str, port: int) -> dict:
body = urllib.parse.urlencode({"remoteAddress": public_ip, "portNumber": str(port)}).encode("utf-8")
req = urllib.request.Request(
"https://ports.yougetsignal.com/check-port.php",
data=body,
headers={
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "pyTorrent/port-check",
"Accept": "text/html,application/json,*/*",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=12) as res:
text = res.read(8192).decode("utf-8", "replace")
low = text.lower()
if "is open" in low:
return {"status": "open", "source": "yougetsignal", "raw": text[:500]}
if "is closed" in low:
return {"status": "closed", "source": "yougetsignal", "raw": text[:500]}
return {"status": "unknown", "source": "yougetsignal", "raw": text[:500]}
def _local_port_fallback(public_ip: str, port: int) -> dict:
try:
with socket.create_connection((public_ip, port), timeout=3):
return {"status": "open", "source": "local-fallback"}
except Exception as exc:
return {"status": "unknown", "source": "local-fallback", "error": f"Local fallback inconclusive: {exc}"}
def _check_ports(public_ip: str, ports: list[int], checker) -> dict:
checked: list[int] = []
first_closed: dict | None = None
last_result: dict = {"status": "unknown"}
for port in ports:
checked.append(port)
current = checker(public_ip, port)
last_result = current
if current.get("status") == "open":
current.update({"port": port, "open_port": port, "checked_ports": checked})
return current
if current.get("status") == "closed" and first_closed is None:
first_closed = current
result = first_closed or last_result
result.update({"port": ports[0] if ports else None, "open_port": None, "checked_ports": checked})
return result
def port_check_status(force: bool = False) -> dict:
profile = preferences.active_profile()
prefs = preferences.get_preferences()
enabled = bool((prefs or {}).get("port_check_enabled"))
if not profile:
return {"status": "unknown", "enabled": enabled, "error": "No profile"}
port_info = _incoming_ports(profile)
ports = port_info["ports"]
if not ports:
return {"status": "unknown", "enabled": enabled, "error": "Cannot read rTorrent network.port_range"}
ports_key = ",".join(str(port) for port in ports)
cache_key = f"port_check:{profile['id']}:{ports_key}:{int(bool(port_info['truncated']))}"
if not force:
cached = _app_setting_get(cache_key)
if cached:
try:
data = json.loads(cached)
if time.time() - float(data.get("checked_at_epoch") or 0) < PORT_CHECK_CACHE_SECONDS:
data["cached"] = True
data["enabled"] = enabled
if not data.get("checked_at"):
data["checked_at"] = _iso_from_epoch(data.get("checked_at_epoch"))
return data
except Exception:
pass
checked_at_epoch = time.time()
result = {
"status": "unknown",
"enabled": enabled,
"port": ports[0],
"ports": ports,
"port_range": port_info["raw"],
"ports_truncated": port_info["truncated"],
"checked_at_epoch": checked_at_epoch,
"checked_at": _iso_from_epoch(checked_at_epoch),
"cached": False,
}
try:
public_ip = _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _yougetsignal_check))
except Exception as exc:
result["error"] = f"YouGetSignal failed: {exc}"
try:
public_ip = result.get("public_ip") or _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _local_port_fallback))
except Exception as fallback_exc:
result["fallback_error"] = str(fallback_exc)
result["source"] = "none"
_app_setting_set(cache_key, json.dumps(result))
return result
from ..services.port_check import port_check_status
+5
View File
@@ -125,6 +125,11 @@ def app_status():
status["port_check"] = {"status": "disabled", "enabled": False} if not bool((prefs or {}).get("port_check_enabled")) else port_check_status(force=False)
except Exception as exc:
status["port_check"] = {"status": "error", "error": str(exc)}
try:
from ..services import background_cache_warmup
status["background_cache_warmup"] = background_cache_warmup.status()
except Exception as exc:
status["background_cache_warmup"] = {"started": False, "error": str(exc)}
status["api_ms"] = round((time.perf_counter() - started) * 1000, 2)
return ok({"status": status})
+67 -46
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
import json
import threading
from ..db import connect, default_user_id, utcnow
from . import rtorrent, auth
from .preferences import active_profile
@@ -9,6 +10,19 @@ from .workers import enqueue
AUTOMATION_JOB_CHUNK_SIZE = 100
AUTOMATION_LIGHT_ACTIONS = {'start', 'stop', 'pause', 'resume', 'set_label'}
_CHECK_LOCKS: dict[tuple[int, int | None], threading.Lock] = {}
_CHECK_LOCKS_GUARD = threading.Lock()
def _check_lock(profile_id: int, rule_id: int | None = None) -> threading.Lock:
"""Prevent overlapping automation runs for the same profile or rule."""
key = (int(profile_id), int(rule_id) if rule_id is not None else None)
with _CHECK_LOCKS_GUARD:
if key not in _CHECK_LOCKS:
_CHECK_LOCKS[key] = threading.Lock()
return _CHECK_LOCKS[key]
def _resolve_user_id(profile: dict[str, Any] | None = None, user_id: int | None = None) -> int:
@@ -457,50 +471,57 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
profile_id = int(profile['id'])
if rule_id is not None:
_require_profile_read(profile_id, user_id)
rules = _list_enabled_rules_for_profile(profile_id, rule_id=rule_id, force=force)
if not rules:
return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0}
torrents = rtorrent.list_torrents(profile)
applied = []
batches = []
now = utcnow()
planned: list[dict[str, Any]] = []
with connect() as conn:
for rule in rules:
if not force and not _cooldown_ok(conn, rule, profile_id):
continue
matched = [t for t in torrents if _conditions_match(conn, rule, profile_id, t)]
if not matched:
continue
hashes = [str(t.get('hash') or '') for t in matched if str(t.get('hash') or '')]
if hashes:
planned.append({'rule': rule, 'matched': matched, 'hashes': hashes})
for item in planned:
rule = item['rule']
matched = item['matched']
hashes = item['hashes']
owner_id = int(rule.get('user_id') or rule.get('owner_user_id') or default_user_id())
if not auth.can_write_profile(profile_id, owner_id):
batch = _record_skipped_rule(profile_id, rule, hashes, 'Rule owner no longer has write access to profile', now)
batches.append(batch)
continue
try:
actions = _apply_effects_bulk(None, profile, matched, rule.get('effects') or [], rule, owner_id)
except Exception as exc:
actions = [{'error': str(exc), 'count': len(hashes), 'target_hashes': hashes}]
changed_hashes = sorted({h for a in actions for h in (a.get('target_hashes') or [])})
if not actions or not changed_hashes:
continue
history_actions = [{k: v for k, v in a.items() if k != 'target_hashes'} for a in actions]
matched_by_hash = {str(t.get('hash') or ''): t for t in matched}
lock = _check_lock(profile_id, rule_id)
if not lock.acquire(blocking=False):
# Note: Browser, manual and background checks can now coexist without duplicate rule application.
return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0, 'skipped': True, 'reason': 'Automation check already running'}
try:
rules = _list_enabled_rules_for_profile(profile_id, rule_id=rule_id, force=force)
if not rules:
return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0}
torrents = rtorrent.list_torrents(profile)
applied = []
batches = []
now = utcnow()
planned: list[dict[str, Any]] = []
with connect() as conn:
for h in changed_hashes:
t = matched_by_hash.get(h, {})
conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, now, now, now))
applied.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'hash': h, 'name': t.get('name'), 'actions': [{'type': a.get('type', 'error'), 'count': a.get('count', len(changed_hashes))} for a in actions]})
_mark_rule_cooldown(conn, rule, profile_id, now)
torrent_name = str(matched_by_hash.get(changed_hashes[0], {}).get('name') or '') if len(changed_hashes) == 1 else f'{len(changed_hashes)} torrents'
torrent_hash = changed_hashes[0] if len(changed_hashes) == 1 else f'batch:{rule["id"]}:{now}'
conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (owner_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_actions), now))
batches.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'count': len(changed_hashes), 'actions': history_actions})
return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied, 'batches': batches}
for rule in rules:
if not force and not _cooldown_ok(conn, rule, profile_id):
continue
matched = [t for t in torrents if _conditions_match(conn, rule, profile_id, t)]
if not matched:
continue
hashes = [str(t.get('hash') or '') for t in matched if str(t.get('hash') or '')]
if hashes:
planned.append({'rule': rule, 'matched': matched, 'hashes': hashes})
for item in planned:
rule = item['rule']
matched = item['matched']
hashes = item['hashes']
owner_id = int(rule.get('user_id') or rule.get('owner_user_id') or default_user_id())
if not auth.can_write_profile(profile_id, owner_id):
batch = _record_skipped_rule(profile_id, rule, hashes, 'Rule owner no longer has write access to profile', now)
batches.append(batch)
continue
try:
actions = _apply_effects_bulk(None, profile, matched, rule.get('effects') or [], rule, owner_id)
except Exception as exc:
actions = [{'error': str(exc), 'count': len(hashes), 'target_hashes': hashes}]
changed_hashes = sorted({h for a in actions for h in (a.get('target_hashes') or [])})
if not actions or not changed_hashes:
continue
history_actions = [{k: v for k, v in a.items() if k != 'target_hashes'} for a in actions]
matched_by_hash = {str(t.get('hash') or ''): t for t in matched}
with connect() as conn:
for h in changed_hashes:
t = matched_by_hash.get(h, {})
conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, now, now, now))
applied.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'hash': h, 'name': t.get('name'), 'actions': [{'type': a.get('type', 'error'), 'count': a.get('count', len(changed_hashes))} for a in actions]})
_mark_rule_cooldown(conn, rule, profile_id, now)
torrent_name = str(matched_by_hash.get(changed_hashes[0], {}).get('name') or '') if len(changed_hashes) == 1 else f'{len(changed_hashes)} torrents'
torrent_hash = changed_hashes[0] if len(changed_hashes) == 1 else f'batch:{rule["id"]}:{now}'
conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (owner_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_actions), now))
batches.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'count': len(changed_hashes), 'actions': history_actions})
return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied, 'batches': batches}
finally:
lock.release()
@@ -0,0 +1,146 @@
from __future__ import annotations
import os
import threading
import time
from typing import Any
from ..db import connect, default_user_id
from . import automation_rules, operation_logs, poller_control, rtorrent
from .websocket import emit_profile_event
_started = False
_start_lock = threading.Lock()
_profile_locks: dict[int, threading.Lock] = {}
_profile_locks_lock = threading.Lock()
_last_logged_status: dict[int, str] = {}
def _configured_interval() -> float:
"""Return the minimum background automation interval from environment settings."""
try:
return max(5.0, min(3600.0, float(os.environ.get("PYTORRENT_AUTOMATION_BACKGROUND_INTERVAL_SECONDS", "15"))))
except Exception:
return 15.0
def _profiles() -> list[dict[str, Any]]:
"""Read configured profiles without relying on a browser session."""
with connect() as conn:
return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()]
def _profile_lock(profile_id: int) -> threading.Lock:
"""Keep one automation pass per profile active at a time."""
with _profile_locks_lock:
if profile_id not in _profile_locks:
_profile_locks[profile_id] = threading.Lock()
return _profile_locks[profile_id]
def _owner_user_id(profile: dict[str, Any]) -> int:
"""Use the profile owner for background checks so rule permissions stay stable."""
return int(profile.get("user_id") or default_user_id())
def _profile_interval(profile_id: int) -> float:
"""Reuse the existing queue poller cadence instead of adding another UI setting."""
settings = poller_control.get_settings(profile_id)
return max(_configured_interval(), float(settings.get("queue_stats_interval_seconds") or 15.0))
def _connected(profile: dict[str, Any]) -> tuple[bool, str]:
"""Verify rTorrent connectivity before running automation logic."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _log_status(profile_id: int, status: str, message: str, *, error: str = "") -> None:
"""Log only connectivity state changes to avoid noisy system logs."""
if _last_logged_status.get(profile_id) == status:
return
_last_logged_status[profile_id] = status
severity = "warning" if error else "info"
operation_logs.record(
profile_id,
"background_automation_status",
message,
severity=severity,
source="system",
action="background_automation",
details={"status": status, "error": error},
)
def _run_profile(socketio, profile: dict[str, Any]) -> None:
"""Run one safe background automation pass for a connected profile."""
profile_id = int(profile.get("id") or 0)
if not profile_id:
return
lock = _profile_lock(profile_id)
if not lock.acquire(blocking=False):
return
try:
ok, error = _connected(profile)
if not ok:
_log_status(profile_id, "disconnected", f"Background automations waiting for rTorrent: {error}", error=error)
return
_log_status(profile_id, "connected", "Background automations detected a working rTorrent connection")
result = automation_rules.check(profile, user_id=_owner_user_id(profile), force=False)
if result.get("applied") or result.get("batches"):
operation_logs.record(
profile_id,
"background_automation_run",
"Background automations applied matching rules",
source="system",
action="background_automation",
details={"applied": len(result.get("applied") or []), "batches": len(result.get("batches") or []), "result": result},
user_id=_owner_user_id(profile),
)
emit_profile_event(socketio, "automation_update", result, profile_id)
except Exception as exc:
operation_logs.record(
profile_id,
"background_automation_error",
f"Background automation check failed: {exc}",
severity="warning",
source="system",
action="background_automation",
details={"error": str(exc)},
user_id=_owner_user_id(profile),
)
finally:
lock.release()
def start_scheduler(socketio) -> None:
"""Start browser-independent automation checks once per application process."""
global _started
with _start_lock:
if _started:
return
_started = True
def runner() -> None:
last_run: dict[int, float] = {}
while True:
started = time.monotonic()
next_sleep = _configured_interval()
for profile in _profiles():
profile_id = int(profile.get("id") or 0)
if not profile_id:
continue
interval = _profile_interval(profile_id)
elapsed = started - float(last_run.get(profile_id) or 0.0)
if elapsed < interval:
next_sleep = min(next_sleep, max(1.0, interval - elapsed))
continue
last_run[profile_id] = started
_run_profile(socketio, profile)
next_sleep = min(next_sleep, interval)
socketio.sleep(max(1.0, next_sleep))
socketio.start_background_task(runner)
@@ -0,0 +1,210 @@
from __future__ import annotations
import os
import threading
import time
from typing import Any
from ..db import connect, default_user_id
from . import port_check, preferences, rtorrent, tracker_cache
from .torrent_cache import torrent_cache
STARTUP_DELAY_SECONDS = 60
DEFAULT_TRACKER_INTERVAL_SECONDS = 15 * 60
DEFAULT_PORT_INTERVAL_SECONDS = port_check.PORT_CHECK_CACHE_SECONDS
FAVICON_BATCH_SIZE = 20
_started = False
_start_lock = threading.Lock()
_status_lock = threading.Lock()
_status: dict[str, Any] = {
"started": False,
"tracker_warmup": {},
"port_check": {},
}
def _setting_float(name: str, default: float, minimum: float, maximum: float) -> float:
"""Read a bounded worker interval from the environment."""
# Note: Defaults keep the worker light while still making UI-independent caches fresh after startup.
try:
value = float(os.environ.get(name, str(default)))
except Exception:
value = default
return max(minimum, min(maximum, value))
def _profiles() -> list[dict[str, Any]]:
"""Read every rTorrent profile directly from the database."""
# Note: The worker cannot rely on active browser session state, so it iterates real configured profiles.
with connect() as conn:
return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()]
def _owner_user_id(profile: dict[str, Any]) -> int:
"""Return the profile owner used for profile-scoped preferences."""
return int(profile.get("user_id") or default_user_id())
def _connected(profile: dict[str, Any]) -> tuple[bool, str]:
"""Check rTorrent connectivity without changing user state."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _remember(section: str, profile_id: int, payload: dict[str, Any]) -> None:
"""Store lightweight in-memory diagnostics for app/status."""
# Note: Cache warmups are not user operations, so they stay out of operation logs by default.
with _status_lock:
data = dict(_status.get(section) or {})
data[str(profile_id)] = {**payload, "updated_at_epoch": time.time()}
_status[section] = data
def status() -> dict[str, Any]:
"""Return current worker diagnostics for system status endpoints."""
with _status_lock:
return {
"started": bool(_status.get("started")),
"startup_delay_seconds": STARTUP_DELAY_SECONDS,
"tracker_warmup": dict(_status.get("tracker_warmup") or {}),
"port_check": dict(_status.get("port_check") or {}),
}
def _tracker_domains_from_rows(rows: list[dict[str, Any]], summary: dict[str, Any], profile_id: int) -> list[str]:
"""Build a bounded tracker domain list from fresh summary data and cached rows."""
domains = [str(item.get("domain") or "") for item in summary.get("trackers") or []]
if not domains:
domains = tracker_cache.cached_domains_for_profile(profile_id, limit=200)
return domains
def _warm_tracker_profile(profile: dict[str, Any]) -> None:
"""Warm tracker summary cache and optional favicon cache for one profile."""
# Note: This mirrors the sidebar warmup, but runs from the backend scheduler instead of waiting for the filter panel.
profile_id = int(profile.get("id") or 0)
if not profile_id:
return
ok, error = _connected(profile)
if not ok:
_remember("tracker_warmup", profile_id, {"ok": False, "skipped": True, "reason": "rtorrent_disconnected", "error": error})
return
owner_id = _owner_user_id(profile)
prefs = preferences.get_preferences(owner_id, profile_id)
rows = torrent_cache.snapshot(profile_id)
if not rows:
torrent_cache.refresh(profile)
rows = torrent_cache.snapshot(profile_id)
hashes = [str(row.get("hash") or "") for row in rows if row.get("hash")]
if not hashes:
_remember("tracker_warmup", profile_id, {"ok": True, "skipped": True, "reason": "no_torrents"})
return
loader = lambda h: rtorrent.torrent_trackers(profile, h)
summary = tracker_cache.summary(profile, hashes, loader, scan_limit=tracker_cache.TRACKER_SCAN_LIMIT, include_favicons=False)
warming = False
if int(summary.get("pending") or 0) > 0:
warming = tracker_cache.warm_summary_cache(profile, hashes, loader, batch_size=tracker_cache.TRACKER_SCAN_LIMIT)
favicon_result = {"checked": 0, "cached": 0, "errors": []}
if bool((prefs or {}).get("tracker_favicons_enabled")):
domains = _tracker_domains_from_rows(rows, summary, profile_id)
favicon_result = tracker_cache.warm_favicon_cache(domains, enabled=True, limit=FAVICON_BATCH_SIZE, force=False)
_remember(
"tracker_warmup",
profile_id,
{
"ok": True,
"hashes": len(hashes),
"pending": int(summary.get("pending") or 0),
"scanned_now": int(summary.get("scanned_now") or 0),
"warming": bool(warming),
"favicons_enabled": bool((prefs or {}).get("tracker_favicons_enabled")),
"favicons": favicon_result,
},
)
def _check_port_profile(profile: dict[str, Any]) -> None:
"""Refresh incoming-port status when the profile preference enables it."""
# Note: force=False respects the existing six-hour cache and avoids unnecessary external checks.
profile_id = int(profile.get("id") or 0)
if not profile_id:
return
owner_id = _owner_user_id(profile)
prefs = preferences.get_preferences(owner_id, profile_id)
if not bool((prefs or {}).get("port_check_enabled")):
_remember("port_check", profile_id, {"ok": True, "enabled": False, "skipped": True, "reason": "disabled"})
return
result = port_check.port_check_status(profile=profile, force=False, user_id=owner_id)
_remember(
"port_check",
profile_id,
{
"ok": not bool(result.get("error") and result.get("source") == "none"),
"enabled": True,
"status": result.get("status"),
"cached": bool(result.get("cached")),
"checked_at": result.get("checked_at"),
"error": result.get("error") or result.get("fallback_error") or "",
},
)
def start_scheduler(socketio=None) -> None:
"""Start browser-independent cache warmup and port-check scheduler."""
global _started
with _start_lock:
if _started:
return
_started = True
with _status_lock:
_status["started"] = True
tracker_interval = _setting_float("PYTORRENT_CACHE_WARMUP_INTERVAL_SECONDS", DEFAULT_TRACKER_INTERVAL_SECONDS, 60.0, 24 * 60 * 60.0)
port_interval = _setting_float("PYTORRENT_PORT_CHECK_INTERVAL_SECONDS", DEFAULT_PORT_INTERVAL_SECONDS, 60.0, 24 * 60 * 60.0)
def runner() -> None:
time.sleep(STARTUP_DELAY_SECONDS)
last_tracker: dict[int, float] = {}
last_port: dict[int, float] = {}
while True:
now = time.monotonic()
next_sleep = 60.0
for profile in _profiles():
profile_id = int(profile.get("id") or 0)
if not profile_id:
continue
if now - float(last_tracker.get(profile_id) or 0.0) >= tracker_interval:
last_tracker[profile_id] = now
try:
_warm_tracker_profile(profile)
except Exception as exc:
_remember("tracker_warmup", profile_id, {"ok": False, "error": str(exc)})
if now - float(last_port.get(profile_id) or 0.0) >= port_interval:
last_port[profile_id] = now
try:
_check_port_profile(profile)
except Exception as exc:
_remember("port_check", profile_id, {"ok": False, "error": str(exc)})
next_sleep = min(
next_sleep,
max(1.0, tracker_interval - (time.monotonic() - float(last_tracker.get(profile_id) or 0.0))),
max(1.0, port_interval - (time.monotonic() - float(last_port.get(profile_id) or 0.0))),
)
sleep_for = max(5.0, min(60.0, next_sleep))
if socketio:
socketio.sleep(sleep_for)
else:
time.sleep(sleep_for)
if socketio:
socketio.start_background_task(runner)
else:
threading.Thread(target=runner, daemon=True, name="pytorrent-cache-warmup-scheduler").start()
+41 -1
View File
@@ -8,7 +8,10 @@ from typing import Any
import psutil
from ..db import connect, default_user_id, utcnow
from . import auth, rtorrent
from . import auth, operation_logs, rtorrent
PLANNER_STARTUP_DELAY_SECONDS = 60
_APP_STARTED_AT = time.monotonic()
DEFAULTS = {
"enabled": False,
@@ -45,6 +48,34 @@ DEFAULTS = {
_LAST_RUN: dict[int, float] = {}
_LAST_LIMITS: dict[int, tuple[int, int]] = {}
_HIGH_CPU_SINCE: dict[int, float] = {}
_PLANNER_CONNECTION_STATUS: dict[int, str] = {}
def _rtorrent_ready(profile: dict) -> tuple[bool, str]:
"""Check rTorrent connectivity before the planner evaluates or applies changes."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _log_connection_status(profile: dict, status: str, message: str, *, error: str = "", user_id: int | None = None) -> None:
"""Record planner connectivity state changes as system operations without noisy repeats."""
profile_id = int(profile.get("id") or 0)
if _PLANNER_CONNECTION_STATUS.get(profile_id) == status:
return
_PLANNER_CONNECTION_STATUS[profile_id] = status
operation_logs.record(
profile_id,
"download_planner_status",
message,
severity="warning" if error else "info",
source="system",
action="download_planner",
details={"status": status, "error": error},
user_id=user_id or int(profile.get("user_id") or 0) or None,
)
def _bool(value: Any) -> bool:
@@ -471,11 +502,20 @@ def enforce(profile: dict, force: bool = False, user_id: int | None = None) -> d
return {"ok": True, "enabled": False, "profile_id": profile_id, "skipped": True, "reason": "planner owner has no write access", "history": history(profile_id, 20), "history_total": history_count(profile_id)}
if not settings.get("enabled"):
return {"ok": True, "enabled": False, "profile_id": profile_id, "history": history(profile_id, 20), "history_total": history_count(profile_id), "preview": preview(profile, user_id=user_id)}
startup_remaining = int(PLANNER_STARTUP_DELAY_SECONDS - (time.monotonic() - _APP_STARTED_AT))
if not force and startup_remaining > 0:
# Note: The background planner keeps the same startup grace as rTorrent config apply, while manual checks still run immediately.
return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True, "reason": "startup_delay", "retry_after_seconds": startup_remaining}
now = time.monotonic()
interval = int(settings.get("check_interval_seconds") or 30)
if not force and now - _LAST_RUN.get(profile_id, 0) < interval:
return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True}
_LAST_RUN[profile_id] = now
ready, connection_error = _rtorrent_ready(profile)
if not ready:
_log_connection_status(profile, "waiting", f"Download Planner is waiting for rTorrent: {connection_error}", error=connection_error, user_id=user_id)
return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True, "reason": "rtorrent_unavailable", "error": connection_error, "retry_after_seconds": interval}
_log_connection_status(profile, "connected", "Download Planner detected a working rTorrent connection", user_id=user_id)
decision = evaluate(profile, settings)
result: dict[str, Any] = {"ok": True, "enabled": True, **decision, "limits_changed": False, "paused": 0, "resumed": 0}
wanted_limits = (int(decision["down"]), int(decision["up"]))
+195
View File
@@ -0,0 +1,195 @@
from __future__ import annotations
import json
import re
import socket
import time
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from typing import Any
from ..db import connect
from . import preferences, rtorrent
PORT_CHECK_CACHE_SECONDS = 6 * 60 * 60
MAX_PORT_CHECK_CANDIDATES = 256
def _app_setting_get(key: str) -> str | None:
with connect() as conn:
row = conn.execute("SELECT value FROM app_settings WHERE key=?", (key,)).fetchone()
return row.get("value") if row else None
def _app_setting_set(key: str, value: str) -> None:
with connect() as conn:
conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (key, value))
def _iso_from_epoch(value: Any) -> str | None:
try:
return datetime.fromtimestamp(float(value), timezone.utc).isoformat(timespec="seconds")
except Exception:
return None
def _public_ip(profile: dict | None = None, force: bool = False) -> str:
if profile and bool(profile.get("is_remote")):
return rtorrent.remote_public_ip(profile, force=force)
req = urllib.request.Request("https://api.ipify.org", headers={"User-Agent": "pyTorrent/port-check"})
with urllib.request.urlopen(req, timeout=8) as res:
return res.read(64).decode("utf-8", "replace").strip()
def _parse_port_candidates(value: str, limit: int = MAX_PORT_CHECK_CANDIDATES) -> tuple[list[int], bool]:
"""Return valid incoming port candidates from rTorrent network.port_range."""
# Note: rTorrent can keep a range/list and pick a random port on start, so the checker tests all safe candidates.
ports: list[int] = []
seen: set[int] = set()
truncated = False
def add(port: int) -> None:
nonlocal truncated
if not 1 <= port <= 65535 or port in seen:
return
if len(ports) >= limit:
truncated = True
return
seen.add(port)
ports.append(port)
for start, end in re.findall(r"(\d{1,5})\s*-\s*(\d{1,5})", value or ""):
a, b = int(start), int(end)
if a > b:
a, b = b, a
for port in range(a, b + 1):
add(port)
if truncated:
break
without_ranges = re.sub(r"\d{1,5}\s*-\s*\d{1,5}", " ", value or "")
for item in re.findall(r"\d{1,5}", without_ranges):
add(int(item))
return ports, truncated
def _incoming_ports(profile: dict) -> dict:
try:
raw_value = str(rtorrent.client_for(profile).call("network.port_range") or "")
except Exception:
raw_value = ""
ports, truncated = _parse_port_candidates(raw_value)
return {"ports": ports, "raw": raw_value, "truncated": truncated}
def _yougetsignal_check(public_ip: str, port: int) -> dict:
body = urllib.parse.urlencode({"remoteAddress": public_ip, "portNumber": str(port)}).encode("utf-8")
req = urllib.request.Request(
"https://ports.yougetsignal.com/check-port.php",
data=body,
headers={
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "pyTorrent/port-check",
"Accept": "text/html,application/json,*/*",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=12) as res:
text = res.read(8192).decode("utf-8", "replace")
low = text.lower()
if "is open" in low:
return {"status": "open", "source": "yougetsignal", "raw": text[:500]}
if "is closed" in low:
return {"status": "closed", "source": "yougetsignal", "raw": text[:500]}
return {"status": "unknown", "source": "yougetsignal", "raw": text[:500]}
def _local_port_fallback(public_ip: str, port: int) -> dict:
try:
with socket.create_connection((public_ip, port), timeout=3):
return {"status": "open", "source": "local-fallback"}
except Exception as exc:
return {"status": "unknown", "source": "local-fallback", "error": f"Local fallback inconclusive: {exc}"}
def _check_ports(public_ip: str, ports: list[int], checker) -> dict:
checked: list[int] = []
first_closed: dict | None = None
last_result: dict = {"status": "unknown"}
for port in ports:
checked.append(port)
current = checker(public_ip, port)
last_result = current
if current.get("status") == "open":
current.update({"port": port, "open_port": port, "checked_ports": checked})
return current
if current.get("status") == "closed" and first_closed is None:
first_closed = current
result = first_closed or last_result
result.update({"port": ports[0] if ports else None, "open_port": None, "checked_ports": checked})
return result
def port_check_status(profile: dict | None = None, force: bool = False, user_id: int | None = None) -> dict:
"""Return cached or freshly checked incoming-port status for one rTorrent profile."""
# Note: This service is shared by UI routes and the background worker, so browser startup is not required.
profile = profile or preferences.active_profile(user_id)
prefs = preferences.get_preferences(user_id, int(profile.get("id"))) if profile else preferences.get_preferences(user_id)
enabled = bool((prefs or {}).get("port_check_enabled"))
if not profile:
return {"status": "unknown", "enabled": enabled, "error": "No profile"}
port_info = _incoming_ports(profile)
ports = port_info["ports"]
if not ports:
return {"status": "unknown", "enabled": enabled, "error": "Cannot read rTorrent network.port_range"}
ports_key = ",".join(str(port) for port in ports)
cache_key = f"port_check:{profile['id']}:{ports_key}:{int(bool(port_info['truncated']))}"
if not force:
cached = _app_setting_get(cache_key)
if cached:
try:
data = json.loads(cached)
if time.time() - float(data.get("checked_at_epoch") or 0) < PORT_CHECK_CACHE_SECONDS:
data["cached"] = True
data["enabled"] = enabled
if not data.get("checked_at"):
data["checked_at"] = _iso_from_epoch(data.get("checked_at_epoch"))
return data
except Exception:
pass
checked_at_epoch = time.time()
result = {
"status": "unknown",
"enabled": enabled,
"port": ports[0],
"ports": ports,
"port_range": port_info["raw"],
"ports_truncated": port_info["truncated"],
"checked_at_epoch": checked_at_epoch,
"checked_at": _iso_from_epoch(checked_at_epoch),
"cached": False,
}
try:
public_ip = _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _yougetsignal_check))
except Exception as exc:
result["error"] = f"YouGetSignal failed: {exc}"
try:
public_ip = result.get("public_ip") or _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _local_port_fallback))
except Exception as fallback_exc:
result["fallback_error"] = str(fallback_exc)
result["source"] = "none"
_app_setting_set(cache_key, json.dumps(result))
return result
+93 -16
View File
@@ -1,26 +1,103 @@
from __future__ import annotations
from time import sleep
from . import preferences, rtorrent
import threading
from time import monotonic
from ..db import connect
from . import operation_logs, rtorrent
_started = False
_start_lock = threading.Lock()
_applied_profiles: set[int] = set()
_last_status: dict[int, str] = {}
def schedule_startup_config_apply(socketio, delay_seconds: int = 60) -> None:
"""Apply saved rTorrent UI overrides after pyTorrent has been running for a moment."""
global _started
if _started:
def _profiles() -> list[dict]:
"""Read all configured profiles because startup work has no browser user session."""
with connect() as conn:
return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()]
def _log_status(profile: dict, status: str, message: str, *, error: str = "", result: dict | None = None) -> None:
"""Write meaningful startup config state changes as system operations."""
profile_id = int(profile.get("id") or 0)
if status in {"waiting", "skipped"} and _last_status.get(profile_id) == status:
return
_started = True
_last_status[profile_id] = status
operation_logs.record(
profile_id,
"rtorrent_config_startup",
message,
severity="warning" if error else "info",
source="system",
action="rtorrent_config",
details={"status": status, "error": error, "result": result or {}},
user_id=int(profile.get("user_id") or 0) or None,
)
def runner():
sleep(max(0, int(delay_seconds)))
try:
for profile in preferences.list_profiles():
result = rtorrent.apply_startup_overrides(profile)
if not result.get("skipped"):
socketio.emit("rtorrent_config_applied", {"profile_id": profile["id"], "result": result})
except Exception as exc:
socketio.emit("rtorrent_config_applied", {"ok": False, "error": str(exc)})
def _rtorrent_ready(profile: dict) -> tuple[bool, str]:
"""Check rTorrent before applying saved runtime overrides."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _apply_profile(socketio, profile: dict) -> None:
"""Apply saved config only after the target rTorrent is reachable."""
profile_id = int(profile.get("id") or 0)
if not profile_id or profile_id in _applied_profiles:
return
ok, error = _rtorrent_ready(profile)
if not ok:
_log_status(profile, "waiting", f"rTorrent config apply is waiting for connection: {error}", error=error)
return
result = rtorrent.apply_startup_overrides(profile)
if result.get("skipped"):
_applied_profiles.add(profile_id)
_log_status(profile, "skipped", "No saved rTorrent startup config overrides to apply", result=result)
return
_applied_profiles.add(profile_id)
_log_status(profile, "applied", "Saved rTorrent startup config overrides applied", result=result)
socketio.emit("rtorrent_config_applied", {"profile_id": profile_id, "result": result})
def schedule_startup_config_apply(socketio, delay_seconds: int = 60, retry_seconds: int = 30, max_wait_seconds: int = 3600) -> None:
"""Apply saved rTorrent UI overrides after the configured startup delay without requiring a browser."""
global _started
with _start_lock:
if _started:
return
_started = True
def runner() -> None:
socketio.sleep(max(0, int(delay_seconds)))
started_at = monotonic()
while True:
try:
profiles = _profiles()
for profile in profiles:
_apply_profile(socketio, profile)
pending = [int(profile.get("id") or 0) for profile in profiles if int(profile.get("id") or 0) not in _applied_profiles]
if not pending or monotonic() - started_at >= max(0, int(max_wait_seconds)):
for profile in profiles:
profile_id = int(profile.get("id") or 0)
if profile_id in pending:
_log_status(profile, "timeout", "rTorrent config startup apply stopped waiting for connection", error="startup wait timeout")
return
except Exception as exc:
operation_logs.record(
None,
"rtorrent_config_startup",
f"rTorrent startup config scheduler failed: {exc}",
severity="warning",
source="system",
action="rtorrent_config",
details={"error": str(exc)},
)
socketio.emit("rtorrent_config_applied", {"ok": False, "error": str(exc)})
socketio.sleep(max(5, int(retry_seconds)))
socketio.start_background_task(runner)
+47
View File
@@ -438,3 +438,50 @@ def favicon_path(domain: str, enabled: bool = True, force: bool = False) -> tupl
(clean, utcnow(), now, "; ".join(errors[-8:]) or "favicon not found"),
)
return None, None
def cached_domains_for_profile(profile_id: int, limit: int = 200) -> list[str]:
"""Return tracker domains already known for a profile from the summary cache."""
# Note: The background favicon worker reads cached summary rows first, so it does not need the browser sidebar to discover domains.
domains: list[str] = []
seen: set[str] = set()
with connect() as conn:
rows = conn.execute(
"SELECT trackers_json FROM tracker_summary_cache WHERE profile_id=? ORDER BY updated_epoch DESC LIMIT ?",
(int(profile_id), max(1, int(limit or 200))),
).fetchall()
for row in rows:
try:
items = json.loads(row.get("trackers_json") or "[]")
except Exception:
items = []
for item in items if isinstance(items, list) else []:
domain = tracker_domain(str((item or {}).get("url") or (item or {}).get("domain") or "")) or str((item or {}).get("domain") or "")
if domain and domain not in seen:
seen.add(domain)
domains.append(domain)
return domains[:max(1, int(limit or 200))]
def warm_favicon_cache(domains: list[str], enabled: bool = True, limit: int = 20, force: bool = False) -> dict:
"""Warm missing or stale tracker favicons for a bounded list of domains."""
# Note: Favicon lookup can perform network requests, so the caller must keep the batch size small.
clean_domains = []
seen: set[str] = set()
for domain in domains or []:
clean = tracker_domain(domain)
if clean and clean not in seen:
seen.add(clean)
clean_domains.append(clean)
checked = 0
cached = 0
errors: list[dict] = []
for domain in clean_domains[:max(0, int(limit or 0))]:
checked += 1
try:
path, _mime = favicon_path(domain, enabled=enabled, force=force)
if path:
cached += 1
except Exception as exc:
errors.append({"domain": domain, "error": str(exc)})
return {"checked": checked, "cached": cached, "errors": errors[:10]}
+77 -12
View File
@@ -1,9 +1,12 @@
#!/usr/bin/env python3
from __future__ import annotations
import os
import re
import time
from pathlib import Path
from urllib.parse import urljoin, urlparse
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
ROOT = Path(__file__).resolve().parents[1]
@@ -31,6 +34,40 @@ GOOGLE_FONT_FAMILIES = (
"Source Sans 3",
)
GOOGLE_FONT_WEIGHTS = "400;500;600;700;800"
DOWNLOAD_RETRIES = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRIES", "4"))
DOWNLOAD_RETRY_DELAY = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRY_DELAY", "10"))
DOWNLOAD_TIMEOUT = int(os.environ.get("PYTORRENT_DOWNLOAD_TIMEOUT", "180"))
def retry_countdown(seconds: int) -> None:
for remaining in range(seconds, 0, -1):
print(f"Retrying in {remaining}s...", end="\r", flush=True)
time.sleep(1)
if seconds > 0:
print(" " * 40, end="\r", flush=True)
def candidate_urls(url: str) -> list[str]:
candidates = [url]
replacements = (
("https://cdn.jsdelivr.net/npm/bootstrap@", "https://unpkg.com/bootstrap@"),
("https://cdn.jsdelivr.net/npm/bootswatch@", "https://unpkg.com/bootswatch@"),
("https://cdn.jsdelivr.net/npm/swagger-ui-dist@", "https://unpkg.com/swagger-ui-dist@"),
("https://cdn.jsdelivr.net/gh/lipis/flag-icons@", "https://cdn.jsdelivr.net/npm/flag-icons@"),
("https://cdn.jsdelivr.net/gh/DevExpress/bootstrap-themes@master/", "https://raw.githubusercontent.com/DevExpress/bootstrap-themes/master/"),
("https://cdn.socket.io/", "https://cdnjs.cloudflare.com/ajax/libs/socket.io/"),
("https://cdnjs.cloudflare.com/ajax/libs/font-awesome/", "https://cdn.jsdelivr.net/npm/@fortawesome/fontawesome-free@"),
)
for old, new in replacements:
if url.startswith(old):
candidates.append(url.replace(old, new, 1))
# font-awesome has a different path layout on npm/jsDelivr.
candidates = [item.replace("/css/all.min.css", "/css/all.min.css") for item in candidates]
unique = []
for item in candidates:
if item not in unique:
unique.append(item)
return unique
def google_fonts_css_url() -> str:
@@ -147,15 +184,31 @@ def bootstrap_css_asset(theme: str) -> dict[str, str]:
def download(url: str, dest: Path) -> None:
dest.parent.mkdir(parents=True, exist_ok=True)
req = Request(url, headers={"User-Agent": "pyTorrent installer"})
with urlopen(req, timeout=60) as response:
data = response.read()
if not data:
raise RuntimeError(f"Empty response for {url}")
tmp = dest.with_suffix(dest.suffix + ".tmp")
tmp.write_bytes(data)
tmp.replace(dest)
print(f"OK {dest.relative_to(ROOT)}")
last_error: Exception | None = None
for candidate in candidate_urls(url):
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
req = Request(candidate, headers={"User-Agent": "pyTorrent installer"})
with urlopen(req, timeout=DOWNLOAD_TIMEOUT) as response:
data = response.read()
if not data:
raise RuntimeError(f"Empty response for {candidate}")
tmp = dest.with_suffix(dest.suffix + ".tmp")
tmp.write_bytes(data)
tmp.replace(dest)
if candidate != url:
print(f"OK {dest.relative_to(ROOT)} from fallback {candidate}")
else:
print(f"OK {dest.relative_to(ROOT)}")
return
except (HTTPError, URLError, TimeoutError, OSError, RuntimeError) as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) for {candidate}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
if candidate != candidate_urls(url)[-1]:
print(f"Trying alternative source: {candidate_urls(url)[candidate_urls(url).index(candidate) + 1]}")
raise RuntimeError(f"Failed to download {url}: {last_error}")
def download_css_with_assets(url: str, dest: Path) -> None:
@@ -184,10 +237,22 @@ def download_google_fonts_css(url: str, dest: Path) -> None:
"Accept": "text/css,*/*;q=0.1",
},
)
with urlopen(req, timeout=60) as response:
css = response.read().decode("utf-8", errors="ignore")
last_error: Exception | None = None
css = ""
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
with urlopen(req, timeout=DOWNLOAD_TIMEOUT) as response:
css = response.read().decode("utf-8", errors="ignore")
if not css.strip():
raise RuntimeError(f"Empty response for {url}")
break
except (HTTPError, URLError, TimeoutError, OSError, RuntimeError) as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) for {url}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
if not css.strip():
raise RuntimeError(f"Empty response for {url}")
raise RuntimeError(f"Failed to download {url}: {last_error}")
def replace_url(match: re.Match[str]) -> str:
quote = match.group(1) or ""
+62 -6
View File
@@ -24,6 +24,10 @@ REPO_URL="${PYTORRENT_REPO_URL:-https://github.com/zdzichu6969/pyTorrent}"
REPO_BRANCH="${PYTORRENT_REPO_BRANCH:-master}"
WORK_DIR="${PYTORRENT_BOOTSTRAP_DIR:-/tmp/pytorrent-only-installer}"
KEEP_WORK_DIR="${PYTORRENT_KEEP_BOOTSTRAP_DIR:-0}"
DOWNLOAD_RETRIES="${PYTORRENT_DOWNLOAD_RETRIES:-4}"
DOWNLOAD_RETRY_DELAY="${PYTORRENT_DOWNLOAD_RETRY_DELAY:-10}"
DOWNLOAD_CONNECT_TIMEOUT="${PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT:-30}"
DOWNLOAD_MAX_TIME="${PYTORRENT_DOWNLOAD_MAX_TIME:-600}"
default_archive_url() {
case "${REPO_URL%/}" in
@@ -61,13 +65,65 @@ prepare_downloader() {
fail "curl or wget is required."
}
retry_countdown() {
local seconds="$1"
local remaining
for ((remaining=seconds; remaining>0; remaining--)); do
printf 'Retrying in %ss...\r' "${remaining}"
sleep 1
done
[[ "${seconds}" -gt 0 ]] && printf '%*s\r' 40 ''
}
archive_url_candidates() {
local url="$1"
printf '%s\n' "${url}"
case "${url}" in
https://github.com/*/archive/refs/heads/*.tar.gz)
local rest owner repo branch
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
branch="${url##*/}"
branch="${branch%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/refs/heads/%s\n' "${owner}" "${repo}" "${branch}"
;;
https://github.com/*/archive/*.tar.gz)
local rest owner repo ref
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
ref="${url##*/}"
ref="${ref%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/%s\n' "${owner}" "${repo}" "${ref}"
;;
esac
}
download_file() {
local url="$1" destination="$2"
if [[ "${DOWNLOADER}" == "curl" ]]; then
curl -fL "${url}" -o "${destination}"
else
wget -O "${destination}" "${url}"
fi
local url="$1"
local destination="$2"
local candidate attempt status
while IFS= read -r candidate; do
[[ -n "${candidate}" ]] || continue
for ((attempt=1; attempt<=DOWNLOAD_RETRIES; attempt++)); do
if [[ "${DOWNLOADER}" == "curl" ]]; then
curl -fL --connect-timeout "${DOWNLOAD_CONNECT_TIMEOUT}" --max-time "${DOWNLOAD_MAX_TIME}" "${candidate}" -o "${destination}" && return 0
status=$?
else
wget --timeout="${DOWNLOAD_CONNECT_TIMEOUT}" --read-timeout="${DOWNLOAD_MAX_TIME}" --tries=1 -O "${destination}" "${candidate}" && return 0
status=$?
fi
log "Download failed (${attempt}/${DOWNLOAD_RETRIES}) from ${candidate} (exit ${status})."
if [[ "${attempt}" -lt "${DOWNLOAD_RETRIES}" ]]; then
retry_countdown "${DOWNLOAD_RETRY_DELAY}"
fi
done
log "Trying alternative source if available after: ${candidate}"
done < <(archive_url_candidates "${url}")
return 1
}
cleanup() {
+60 -5
View File
@@ -17,6 +17,10 @@ REPO_URL="${PYTORRENT_REPO_URL:-https://github.com/zdzichu6969/pyTorrent}"
REPO_BRANCH="${PYTORRENT_REPO_BRANCH:-master}"
WORK_DIR="${PYTORRENT_BOOTSTRAP_DIR:-/tmp/pytorrent-stack-installer}"
KEEP_WORK_DIR="${PYTORRENT_KEEP_BOOTSTRAP_DIR:-0}"
DOWNLOAD_RETRIES="${PYTORRENT_DOWNLOAD_RETRIES:-4}"
DOWNLOAD_RETRY_DELAY="${PYTORRENT_DOWNLOAD_RETRY_DELAY:-10}"
DOWNLOAD_CONNECT_TIMEOUT="${PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT:-30}"
DOWNLOAD_MAX_TIME="${PYTORRENT_DOWNLOAD_MAX_TIME:-600}"
default_archive_url() {
case "${REPO_URL%/}" in
@@ -105,14 +109,65 @@ prepare_downloader() {
fail "curl or wget is required and no supported package manager was found."
}
retry_countdown() {
local seconds="$1"
local remaining
for ((remaining=seconds; remaining>0; remaining--)); do
printf 'Retrying in %ss...\r' "${remaining}"
sleep 1
done
[[ "${seconds}" -gt 0 ]] && printf '%*s\r' 40 ''
}
archive_url_candidates() {
local url="$1"
printf '%s\n' "${url}"
case "${url}" in
https://github.com/*/archive/refs/heads/*.tar.gz)
local rest owner repo branch
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
branch="${url##*/}"
branch="${branch%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/refs/heads/%s\n' "${owner}" "${repo}" "${branch}"
;;
https://github.com/*/archive/*.tar.gz)
local rest owner repo ref
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
ref="${url##*/}"
ref="${ref%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/%s\n' "${owner}" "${repo}" "${ref}"
;;
esac
}
download_file() {
local url="$1"
local destination="$2"
if [[ "${DOWNLOADER}" == "curl" ]]; then
curl -fL "${url}" -o "${destination}"
else
wget -O "${destination}" "${url}"
fi
local candidate attempt status
while IFS= read -r candidate; do
[[ -n "${candidate}" ]] || continue
for ((attempt=1; attempt<=DOWNLOAD_RETRIES; attempt++)); do
if [[ "${DOWNLOADER}" == "curl" ]]; then
curl -fL --connect-timeout "${DOWNLOAD_CONNECT_TIMEOUT}" --max-time "${DOWNLOAD_MAX_TIME}" "${candidate}" -o "${destination}" && return 0
status=$?
else
wget --timeout="${DOWNLOAD_CONNECT_TIMEOUT}" --read-timeout="${DOWNLOAD_MAX_TIME}" --tries=1 -O "${destination}" "${candidate}" && return 0
status=$?
fi
log "Download failed (${attempt}/${DOWNLOAD_RETRIES}) from ${candidate} (exit ${status})."
if [[ "${attempt}" -lt "${DOWNLOAD_RETRIES}" ]]; then
retry_countdown "${DOWNLOAD_RETRY_DELAY}"
fi
done
log "Trying alternative source if available after: ${candidate}"
done < <(archive_url_candidates "${url}")
return 1
}
detect_os_family() {
+70 -4
View File
@@ -24,6 +24,54 @@ DEFAULT_CURL_REF = "8.19.0"
DEFAULT_SERVICE_PATH = "/etc/systemd/system/rtorrent@.service"
DEFAULT_SCGI_PORT = 5000
DEFAULT_TORRENT_PORT = 51300
DOWNLOAD_RETRIES = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRIES", "4"))
DOWNLOAD_RETRY_DELAY = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRY_DELAY", "10"))
DOWNLOAD_CONNECT_TIMEOUT = int(os.environ.get("PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT", "30"))
DOWNLOAD_MAX_TIME = int(os.environ.get("PYTORRENT_DOWNLOAD_MAX_TIME", "600"))
def retry_countdown(seconds):
for remaining in range(seconds, 0, -1):
print(f"Retrying in {remaining}s...", end="\r", flush=True)
time.sleep(1)
if seconds > 0:
print(" " * 40, end="\r", flush=True)
def run_with_retry(cmd, *, retries=DOWNLOAD_RETRIES, retry_delay=DOWNLOAD_RETRY_DELAY, retry_label=None, **kwargs):
last_error = None
label = retry_label or " ".join(str(x) for x in cmd[:3])
for attempt in range(1, retries + 1):
try:
return run(cmd, **kwargs)
except InstallError as exc:
last_error = exc
print(f"{label} failed ({attempt}/{retries}): {exc}")
if attempt < retries:
retry_countdown(retry_delay)
raise last_error
def download_url_candidates(url):
candidates = [url]
if url.startswith("https://github.com/c-ares/c-ares/releases/download/v") and url.endswith(".tar.gz"):
version = url.rsplit("/c-ares-", 1)[-1].removesuffix(".tar.gz")
candidates.append(f"https://codeload.github.com/c-ares/c-ares/tar.gz/refs/tags/v{version}")
if url.startswith("https://curl.se/download/curl-") and url.endswith(".tar.gz"):
version = url.rsplit("/curl-", 1)[-1].removesuffix(".tar.gz")
tag = "curl-" + version.replace(".", "_")
candidates.append(f"https://github.com/curl/curl/releases/download/{tag}/curl-{version}.tar.gz")
candidates.append(f"https://codeload.github.com/curl/curl/tar.gz/refs/tags/{tag}")
if "sourceforge.net/projects/xmlrpc-c/files/latest/download" in url:
candidates.append("https://downloads.sourceforge.net/project/xmlrpc-c/latest/download")
if url.startswith("https://downloads.sourceforge.net/project/xmlrpc-c/"):
candidates.append(url.replace("https://downloads.sourceforge.net/", "https://sourceforge.net/projects/").replace("project/xmlrpc-c/", "xmlrpc-c/files/"))
unique = []
for candidate in candidates:
if candidate not in unique:
unique.append(candidate)
return unique
class InstallError(Exception):
@@ -220,17 +268,35 @@ def clone_or_update_repo(repo_url, repo_dir, ref, *, debug=False):
repo_dir = Path(repo_dir)
if not repo_dir.exists():
with Spinner(f"Cloning {repo_dir.name}", enabled=not debug):
run(["git", "clone", repo_url, str(repo_dir)], debug=debug)
run_with_retry(["git", "clone", repo_url, str(repo_dir)], debug=debug, retry_label=f"git clone {repo_url}")
else:
print(f"Repository already exists: {repo_dir}")
with Spinner(f"Checking out {repo_dir.name} -> {ref}", enabled=not debug):
run(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug)
run_with_retry(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug, retry_label=f"git fetch {repo_dir.name}")
run(["git", "checkout", ref], cwd=str(repo_dir), debug=debug)
run(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug)
run_with_retry(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug, retry_label=f"git pull {repo_dir.name}")
def download_file(url, destination, *, debug=False):
run(["curl", "-fL", url, "-o", str(destination)], debug=debug)
last_error = None
for candidate in download_url_candidates(url):
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
return run([
"curl",
"-fL",
"--connect-timeout", str(DOWNLOAD_CONNECT_TIMEOUT),
"--max-time", str(DOWNLOAD_MAX_TIME),
candidate,
"-o", str(destination),
], debug=debug)
except InstallError as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) from {candidate}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
print(f"Trying alternative source if available after: {candidate}")
raise last_error or InstallError(f"Download failed: {url}")
def extract_tarball(tarball, destination, *, debug=False):
@@ -24,6 +24,54 @@ DEFAULT_CURL_REF = "8.19.0"
DEFAULT_SERVICE_PATH = "/etc/systemd/system/rtorrent@.service"
DEFAULT_SCGI_PORT = 5000
DEFAULT_TORRENT_PORT = 51300
DOWNLOAD_RETRIES = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRIES", "4"))
DOWNLOAD_RETRY_DELAY = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRY_DELAY", "10"))
DOWNLOAD_CONNECT_TIMEOUT = int(os.environ.get("PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT", "30"))
DOWNLOAD_MAX_TIME = int(os.environ.get("PYTORRENT_DOWNLOAD_MAX_TIME", "600"))
def retry_countdown(seconds):
for remaining in range(seconds, 0, -1):
print(f"Retrying in {remaining}s...", end="\r", flush=True)
time.sleep(1)
if seconds > 0:
print(" " * 40, end="\r", flush=True)
def run_with_retry(cmd, *, retries=DOWNLOAD_RETRIES, retry_delay=DOWNLOAD_RETRY_DELAY, retry_label=None, **kwargs):
last_error = None
label = retry_label or " ".join(str(x) for x in cmd[:3])
for attempt in range(1, retries + 1):
try:
return run(cmd, **kwargs)
except InstallError as exc:
last_error = exc
print(f"{label} failed ({attempt}/{retries}): {exc}")
if attempt < retries:
retry_countdown(retry_delay)
raise last_error
def download_url_candidates(url):
candidates = [url]
if url.startswith("https://github.com/c-ares/c-ares/releases/download/v") and url.endswith(".tar.gz"):
version = url.rsplit("/c-ares-", 1)[-1].removesuffix(".tar.gz")
candidates.append(f"https://codeload.github.com/c-ares/c-ares/tar.gz/refs/tags/v{version}")
if url.startswith("https://curl.se/download/curl-") and url.endswith(".tar.gz"):
version = url.rsplit("/curl-", 1)[-1].removesuffix(".tar.gz")
tag = "curl-" + version.replace(".", "_")
candidates.append(f"https://github.com/curl/curl/releases/download/{tag}/curl-{version}.tar.gz")
candidates.append(f"https://codeload.github.com/curl/curl/tar.gz/refs/tags/{tag}")
if "sourceforge.net/projects/xmlrpc-c/files/latest/download" in url:
candidates.append("https://downloads.sourceforge.net/project/xmlrpc-c/latest/download")
if url.startswith("https://downloads.sourceforge.net/project/xmlrpc-c/"):
candidates.append(url.replace("https://downloads.sourceforge.net/", "https://sourceforge.net/projects/").replace("project/xmlrpc-c/", "xmlrpc-c/files/"))
unique = []
for candidate in candidates:
if candidate not in unique:
unique.append(candidate)
return unique
class InstallError(Exception):
@@ -221,17 +269,35 @@ def clone_or_update_repo(repo_url, repo_dir, ref, *, debug=False):
repo_dir = Path(repo_dir)
if not repo_dir.exists():
with Spinner(f"Cloning {repo_dir.name}", enabled=not debug):
run(["git", "clone", repo_url, str(repo_dir)], debug=debug)
run_with_retry(["git", "clone", repo_url, str(repo_dir)], debug=debug, retry_label=f"git clone {repo_url}")
else:
print(f"Repository already exists: {repo_dir}")
with Spinner(f"Checking out {repo_dir.name} -> {ref}", enabled=not debug):
run(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug)
run_with_retry(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug, retry_label=f"git fetch {repo_dir.name}")
run(["git", "checkout", ref], cwd=str(repo_dir), debug=debug)
run(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug)
run_with_retry(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug, retry_label=f"git pull {repo_dir.name}")
def download_file(url, destination, *, debug=False):
run(["curl", "-fL", url, "-o", str(destination)], debug=debug)
last_error = None
for candidate in download_url_candidates(url):
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
return run([
"curl",
"-fL",
"--connect-timeout", str(DOWNLOAD_CONNECT_TIMEOUT),
"--max-time", str(DOWNLOAD_MAX_TIME),
candidate,
"-o", str(destination),
], debug=debug)
except InstallError as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) from {candidate}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
print(f"Trying alternative source if available after: {candidate}")
raise last_error or InstallError(f"Download failed: {url}")
def extract_tarball(tarball, destination, *, debug=False):