Merge pull request 'Start worker' (#28) from start_worker into master

Reviewed-on: #28
This commit was merged in pull request #28.
This commit is contained in:
gru
2026-06-14 23:00:46 +02:00
15 changed files with 1148 additions and 280 deletions
+4
View File
@@ -143,6 +143,8 @@ def create_app() -> Flask:
register_socketio_handlers(socketio) register_socketio_handlers(socketio)
from .services.startup_config import schedule_startup_config_apply from .services.startup_config import schedule_startup_config_apply
schedule_startup_config_apply(socketio) schedule_startup_config_apply(socketio)
from .services.background_automations import start_scheduler as start_background_automation_scheduler
start_background_automation_scheduler(socketio)
from .services.rss import start_scheduler as start_rss_scheduler from .services.rss import start_scheduler as start_rss_scheduler
from .services.ratio_rules import start_scheduler as start_ratio_scheduler from .services.ratio_rules import start_scheduler as start_ratio_scheduler
from .services.download_planner import start_scheduler as start_download_planner_scheduler from .services.download_planner import start_scheduler as start_download_planner_scheduler
@@ -151,4 +153,6 @@ def create_app() -> Flask:
start_ratio_scheduler(socketio) start_ratio_scheduler(socketio)
start_download_planner_scheduler(socketio) start_download_planner_scheduler(socketio)
start_backup_scheduler() start_backup_scheduler()
from .services.background_cache_warmup import start_scheduler as start_cache_warmup_scheduler
start_cache_warmup_scheduler(socketio)
return app return app
+1 -186
View File
@@ -52,192 +52,7 @@ def ok(payload=None):
PORT_CHECK_CACHE_SECONDS = 6 * 60 * 60 from ..services.port_check import port_check_status
def _app_setting_get(key: str):
with connect() as conn:
row = conn.execute("SELECT value FROM app_settings WHERE key=?", (key,)).fetchone()
return row.get("value") if row else None
def _app_setting_set(key: str, value: str):
with connect() as conn:
conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (key, value))
def _iso_from_epoch(value) -> str | None:
try:
return datetime.fromtimestamp(float(value), timezone.utc).isoformat(timespec="seconds")
except Exception:
return None
def _public_ip(profile: dict | None = None, force: bool = False) -> str:
if profile and bool(profile.get("is_remote")):
return rtorrent.remote_public_ip(profile, force=force)
req = urllib.request.Request("https://api.ipify.org", headers={"User-Agent": "pyTorrent/port-check"})
with urllib.request.urlopen(req, timeout=8) as res:
return res.read(64).decode("utf-8", "replace").strip()
MAX_PORT_CHECK_CANDIDATES = 256
def _parse_port_candidates(value: str, limit: int = MAX_PORT_CHECK_CANDIDATES) -> tuple[list[int], bool]:
"""Return valid incoming port candidates from rTorrent network.port_range.
Note: rTorrent may keep a range/list and pick a random port on start.
The old checker used only the first number, which produced false "closed"
results when another configured port was actually active.
"""
ports: list[int] = []
seen: set[int] = set()
truncated = False
def add(port: int) -> None:
nonlocal truncated
if not 1 <= port <= 65535 or port in seen:
return
if len(ports) >= limit:
truncated = True
return
seen.add(port)
ports.append(port)
for start, end in re.findall(r"(\d{1,5})\s*-\s*(\d{1,5})", value or ""):
a, b = int(start), int(end)
if a > b:
a, b = b, a
for port in range(a, b + 1):
add(port)
if truncated:
break
without_ranges = re.sub(r"\d{1,5}\s*-\s*\d{1,5}", " ", value or "")
for item in re.findall(r"\d{1,5}", without_ranges):
add(int(item))
return ports, truncated
def _incoming_ports(profile: dict) -> dict:
try:
raw_value = str(rtorrent.client_for(profile).call("network.port_range") or "")
except Exception:
raw_value = ""
ports, truncated = _parse_port_candidates(raw_value)
return {"ports": ports, "raw": raw_value, "truncated": truncated}
def _yougetsignal_check(public_ip: str, port: int) -> dict:
body = urllib.parse.urlencode({"remoteAddress": public_ip, "portNumber": str(port)}).encode("utf-8")
req = urllib.request.Request(
"https://ports.yougetsignal.com/check-port.php",
data=body,
headers={
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "pyTorrent/port-check",
"Accept": "text/html,application/json,*/*",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=12) as res:
text = res.read(8192).decode("utf-8", "replace")
low = text.lower()
if "is open" in low:
return {"status": "open", "source": "yougetsignal", "raw": text[:500]}
if "is closed" in low:
return {"status": "closed", "source": "yougetsignal", "raw": text[:500]}
return {"status": "unknown", "source": "yougetsignal", "raw": text[:500]}
def _local_port_fallback(public_ip: str, port: int) -> dict:
try:
with socket.create_connection((public_ip, port), timeout=3):
return {"status": "open", "source": "local-fallback"}
except Exception as exc:
return {"status": "unknown", "source": "local-fallback", "error": f"Local fallback inconclusive: {exc}"}
def _check_ports(public_ip: str, ports: list[int], checker) -> dict:
checked: list[int] = []
first_closed: dict | None = None
last_result: dict = {"status": "unknown"}
for port in ports:
checked.append(port)
current = checker(public_ip, port)
last_result = current
if current.get("status") == "open":
current.update({"port": port, "open_port": port, "checked_ports": checked})
return current
if current.get("status") == "closed" and first_closed is None:
first_closed = current
result = first_closed or last_result
result.update({"port": ports[0] if ports else None, "open_port": None, "checked_ports": checked})
return result
def port_check_status(force: bool = False) -> dict:
profile = preferences.active_profile()
prefs = preferences.get_preferences()
enabled = bool((prefs or {}).get("port_check_enabled"))
if not profile:
return {"status": "unknown", "enabled": enabled, "error": "No profile"}
port_info = _incoming_ports(profile)
ports = port_info["ports"]
if not ports:
return {"status": "unknown", "enabled": enabled, "error": "Cannot read rTorrent network.port_range"}
ports_key = ",".join(str(port) for port in ports)
cache_key = f"port_check:{profile['id']}:{ports_key}:{int(bool(port_info['truncated']))}"
if not force:
cached = _app_setting_get(cache_key)
if cached:
try:
data = json.loads(cached)
if time.time() - float(data.get("checked_at_epoch") or 0) < PORT_CHECK_CACHE_SECONDS:
data["cached"] = True
data["enabled"] = enabled
if not data.get("checked_at"):
data["checked_at"] = _iso_from_epoch(data.get("checked_at_epoch"))
return data
except Exception:
pass
checked_at_epoch = time.time()
result = {
"status": "unknown",
"enabled": enabled,
"port": ports[0],
"ports": ports,
"port_range": port_info["raw"],
"ports_truncated": port_info["truncated"],
"checked_at_epoch": checked_at_epoch,
"checked_at": _iso_from_epoch(checked_at_epoch),
"cached": False,
}
try:
public_ip = _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _yougetsignal_check))
except Exception as exc:
result["error"] = f"YouGetSignal failed: {exc}"
try:
public_ip = result.get("public_ip") or _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _local_port_fallback))
except Exception as fallback_exc:
result["fallback_error"] = str(fallback_exc)
result["source"] = "none"
_app_setting_set(cache_key, json.dumps(result))
return result
+5
View File
@@ -125,6 +125,11 @@ def app_status():
status["port_check"] = {"status": "disabled", "enabled": False} if not bool((prefs or {}).get("port_check_enabled")) else port_check_status(force=False) status["port_check"] = {"status": "disabled", "enabled": False} if not bool((prefs or {}).get("port_check_enabled")) else port_check_status(force=False)
except Exception as exc: except Exception as exc:
status["port_check"] = {"status": "error", "error": str(exc)} status["port_check"] = {"status": "error", "error": str(exc)}
try:
from ..services import background_cache_warmup
status["background_cache_warmup"] = background_cache_warmup.status()
except Exception as exc:
status["background_cache_warmup"] = {"started": False, "error": str(exc)}
status["api_ms"] = round((time.perf_counter() - started) * 1000, 2) status["api_ms"] = round((time.perf_counter() - started) * 1000, 2)
return ok({"status": status}) return ok({"status": status})
+21
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any from typing import Any
import json import json
import threading
from ..db import connect, default_user_id, utcnow from ..db import connect, default_user_id, utcnow
from . import rtorrent, auth from . import rtorrent, auth
from .preferences import active_profile from .preferences import active_profile
@@ -9,6 +10,19 @@ from .workers import enqueue
AUTOMATION_JOB_CHUNK_SIZE = 100 AUTOMATION_JOB_CHUNK_SIZE = 100
AUTOMATION_LIGHT_ACTIONS = {'start', 'stop', 'pause', 'resume', 'set_label'} AUTOMATION_LIGHT_ACTIONS = {'start', 'stop', 'pause', 'resume', 'set_label'}
_CHECK_LOCKS: dict[tuple[int, int | None], threading.Lock] = {}
_CHECK_LOCKS_GUARD = threading.Lock()
def _check_lock(profile_id: int, rule_id: int | None = None) -> threading.Lock:
"""Prevent overlapping automation runs for the same profile or rule."""
key = (int(profile_id), int(rule_id) if rule_id is not None else None)
with _CHECK_LOCKS_GUARD:
if key not in _CHECK_LOCKS:
_CHECK_LOCKS[key] = threading.Lock()
return _CHECK_LOCKS[key]
def _resolve_user_id(profile: dict[str, Any] | None = None, user_id: int | None = None) -> int: def _resolve_user_id(profile: dict[str, Any] | None = None, user_id: int | None = None) -> int:
@@ -457,6 +471,11 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
profile_id = int(profile['id']) profile_id = int(profile['id'])
if rule_id is not None: if rule_id is not None:
_require_profile_read(profile_id, user_id) _require_profile_read(profile_id, user_id)
lock = _check_lock(profile_id, rule_id)
if not lock.acquire(blocking=False):
# Note: Browser, manual and background checks can now coexist without duplicate rule application.
return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0, 'skipped': True, 'reason': 'Automation check already running'}
try:
rules = _list_enabled_rules_for_profile(profile_id, rule_id=rule_id, force=force) rules = _list_enabled_rules_for_profile(profile_id, rule_id=rule_id, force=force)
if not rules: if not rules:
return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0} return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0}
@@ -504,3 +523,5 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (owner_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_actions), now)) conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (owner_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_actions), now))
batches.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'count': len(changed_hashes), 'actions': history_actions}) batches.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'owner_user_id': owner_id, 'owner_label': rule.get('owner_label'), 'count': len(changed_hashes), 'actions': history_actions})
return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied, 'batches': batches} return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied, 'batches': batches}
finally:
lock.release()
@@ -0,0 +1,146 @@
from __future__ import annotations
import os
import threading
import time
from typing import Any
from ..db import connect, default_user_id
from . import automation_rules, operation_logs, poller_control, rtorrent
from .websocket import emit_profile_event
_started = False
_start_lock = threading.Lock()
_profile_locks: dict[int, threading.Lock] = {}
_profile_locks_lock = threading.Lock()
_last_logged_status: dict[int, str] = {}
def _configured_interval() -> float:
"""Return the minimum background automation interval from environment settings."""
try:
return max(5.0, min(3600.0, float(os.environ.get("PYTORRENT_AUTOMATION_BACKGROUND_INTERVAL_SECONDS", "15"))))
except Exception:
return 15.0
def _profiles() -> list[dict[str, Any]]:
"""Read configured profiles without relying on a browser session."""
with connect() as conn:
return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()]
def _profile_lock(profile_id: int) -> threading.Lock:
"""Keep one automation pass per profile active at a time."""
with _profile_locks_lock:
if profile_id not in _profile_locks:
_profile_locks[profile_id] = threading.Lock()
return _profile_locks[profile_id]
def _owner_user_id(profile: dict[str, Any]) -> int:
"""Use the profile owner for background checks so rule permissions stay stable."""
return int(profile.get("user_id") or default_user_id())
def _profile_interval(profile_id: int) -> float:
"""Reuse the existing queue poller cadence instead of adding another UI setting."""
settings = poller_control.get_settings(profile_id)
return max(_configured_interval(), float(settings.get("queue_stats_interval_seconds") or 15.0))
def _connected(profile: dict[str, Any]) -> tuple[bool, str]:
"""Verify rTorrent connectivity before running automation logic."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _log_status(profile_id: int, status: str, message: str, *, error: str = "") -> None:
"""Log only connectivity state changes to avoid noisy system logs."""
if _last_logged_status.get(profile_id) == status:
return
_last_logged_status[profile_id] = status
severity = "warning" if error else "info"
operation_logs.record(
profile_id,
"background_automation_status",
message,
severity=severity,
source="system",
action="background_automation",
details={"status": status, "error": error},
)
def _run_profile(socketio, profile: dict[str, Any]) -> None:
"""Run one safe background automation pass for a connected profile."""
profile_id = int(profile.get("id") or 0)
if not profile_id:
return
lock = _profile_lock(profile_id)
if not lock.acquire(blocking=False):
return
try:
ok, error = _connected(profile)
if not ok:
_log_status(profile_id, "disconnected", f"Background automations waiting for rTorrent: {error}", error=error)
return
_log_status(profile_id, "connected", "Background automations detected a working rTorrent connection")
result = automation_rules.check(profile, user_id=_owner_user_id(profile), force=False)
if result.get("applied") or result.get("batches"):
operation_logs.record(
profile_id,
"background_automation_run",
"Background automations applied matching rules",
source="system",
action="background_automation",
details={"applied": len(result.get("applied") or []), "batches": len(result.get("batches") or []), "result": result},
user_id=_owner_user_id(profile),
)
emit_profile_event(socketio, "automation_update", result, profile_id)
except Exception as exc:
operation_logs.record(
profile_id,
"background_automation_error",
f"Background automation check failed: {exc}",
severity="warning",
source="system",
action="background_automation",
details={"error": str(exc)},
user_id=_owner_user_id(profile),
)
finally:
lock.release()
def start_scheduler(socketio) -> None:
"""Start browser-independent automation checks once per application process."""
global _started
with _start_lock:
if _started:
return
_started = True
def runner() -> None:
last_run: dict[int, float] = {}
while True:
started = time.monotonic()
next_sleep = _configured_interval()
for profile in _profiles():
profile_id = int(profile.get("id") or 0)
if not profile_id:
continue
interval = _profile_interval(profile_id)
elapsed = started - float(last_run.get(profile_id) or 0.0)
if elapsed < interval:
next_sleep = min(next_sleep, max(1.0, interval - elapsed))
continue
last_run[profile_id] = started
_run_profile(socketio, profile)
next_sleep = min(next_sleep, interval)
socketio.sleep(max(1.0, next_sleep))
socketio.start_background_task(runner)
@@ -0,0 +1,210 @@
from __future__ import annotations
import os
import threading
import time
from typing import Any
from ..db import connect, default_user_id
from . import port_check, preferences, rtorrent, tracker_cache
from .torrent_cache import torrent_cache
STARTUP_DELAY_SECONDS = 60
DEFAULT_TRACKER_INTERVAL_SECONDS = 15 * 60
DEFAULT_PORT_INTERVAL_SECONDS = port_check.PORT_CHECK_CACHE_SECONDS
FAVICON_BATCH_SIZE = 20
_started = False
_start_lock = threading.Lock()
_status_lock = threading.Lock()
_status: dict[str, Any] = {
"started": False,
"tracker_warmup": {},
"port_check": {},
}
def _setting_float(name: str, default: float, minimum: float, maximum: float) -> float:
"""Read a bounded worker interval from the environment."""
# Note: Defaults keep the worker light while still making UI-independent caches fresh after startup.
try:
value = float(os.environ.get(name, str(default)))
except Exception:
value = default
return max(minimum, min(maximum, value))
def _profiles() -> list[dict[str, Any]]:
"""Read every rTorrent profile directly from the database."""
# Note: The worker cannot rely on active browser session state, so it iterates real configured profiles.
with connect() as conn:
return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()]
def _owner_user_id(profile: dict[str, Any]) -> int:
"""Return the profile owner used for profile-scoped preferences."""
return int(profile.get("user_id") or default_user_id())
def _connected(profile: dict[str, Any]) -> tuple[bool, str]:
"""Check rTorrent connectivity without changing user state."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _remember(section: str, profile_id: int, payload: dict[str, Any]) -> None:
"""Store lightweight in-memory diagnostics for app/status."""
# Note: Cache warmups are not user operations, so they stay out of operation logs by default.
with _status_lock:
data = dict(_status.get(section) or {})
data[str(profile_id)] = {**payload, "updated_at_epoch": time.time()}
_status[section] = data
def status() -> dict[str, Any]:
"""Return current worker diagnostics for system status endpoints."""
with _status_lock:
return {
"started": bool(_status.get("started")),
"startup_delay_seconds": STARTUP_DELAY_SECONDS,
"tracker_warmup": dict(_status.get("tracker_warmup") or {}),
"port_check": dict(_status.get("port_check") or {}),
}
def _tracker_domains_from_rows(rows: list[dict[str, Any]], summary: dict[str, Any], profile_id: int) -> list[str]:
"""Build a bounded tracker domain list from fresh summary data and cached rows."""
domains = [str(item.get("domain") or "") for item in summary.get("trackers") or []]
if not domains:
domains = tracker_cache.cached_domains_for_profile(profile_id, limit=200)
return domains
def _warm_tracker_profile(profile: dict[str, Any]) -> None:
"""Warm tracker summary cache and optional favicon cache for one profile."""
# Note: This mirrors the sidebar warmup, but runs from the backend scheduler instead of waiting for the filter panel.
profile_id = int(profile.get("id") or 0)
if not profile_id:
return
ok, error = _connected(profile)
if not ok:
_remember("tracker_warmup", profile_id, {"ok": False, "skipped": True, "reason": "rtorrent_disconnected", "error": error})
return
owner_id = _owner_user_id(profile)
prefs = preferences.get_preferences(owner_id, profile_id)
rows = torrent_cache.snapshot(profile_id)
if not rows:
torrent_cache.refresh(profile)
rows = torrent_cache.snapshot(profile_id)
hashes = [str(row.get("hash") or "") for row in rows if row.get("hash")]
if not hashes:
_remember("tracker_warmup", profile_id, {"ok": True, "skipped": True, "reason": "no_torrents"})
return
loader = lambda h: rtorrent.torrent_trackers(profile, h)
summary = tracker_cache.summary(profile, hashes, loader, scan_limit=tracker_cache.TRACKER_SCAN_LIMIT, include_favicons=False)
warming = False
if int(summary.get("pending") or 0) > 0:
warming = tracker_cache.warm_summary_cache(profile, hashes, loader, batch_size=tracker_cache.TRACKER_SCAN_LIMIT)
favicon_result = {"checked": 0, "cached": 0, "errors": []}
if bool((prefs or {}).get("tracker_favicons_enabled")):
domains = _tracker_domains_from_rows(rows, summary, profile_id)
favicon_result = tracker_cache.warm_favicon_cache(domains, enabled=True, limit=FAVICON_BATCH_SIZE, force=False)
_remember(
"tracker_warmup",
profile_id,
{
"ok": True,
"hashes": len(hashes),
"pending": int(summary.get("pending") or 0),
"scanned_now": int(summary.get("scanned_now") or 0),
"warming": bool(warming),
"favicons_enabled": bool((prefs or {}).get("tracker_favicons_enabled")),
"favicons": favicon_result,
},
)
def _check_port_profile(profile: dict[str, Any]) -> None:
"""Refresh incoming-port status when the profile preference enables it."""
# Note: force=False respects the existing six-hour cache and avoids unnecessary external checks.
profile_id = int(profile.get("id") or 0)
if not profile_id:
return
owner_id = _owner_user_id(profile)
prefs = preferences.get_preferences(owner_id, profile_id)
if not bool((prefs or {}).get("port_check_enabled")):
_remember("port_check", profile_id, {"ok": True, "enabled": False, "skipped": True, "reason": "disabled"})
return
result = port_check.port_check_status(profile=profile, force=False, user_id=owner_id)
_remember(
"port_check",
profile_id,
{
"ok": not bool(result.get("error") and result.get("source") == "none"),
"enabled": True,
"status": result.get("status"),
"cached": bool(result.get("cached")),
"checked_at": result.get("checked_at"),
"error": result.get("error") or result.get("fallback_error") or "",
},
)
def start_scheduler(socketio=None) -> None:
"""Start browser-independent cache warmup and port-check scheduler."""
global _started
with _start_lock:
if _started:
return
_started = True
with _status_lock:
_status["started"] = True
tracker_interval = _setting_float("PYTORRENT_CACHE_WARMUP_INTERVAL_SECONDS", DEFAULT_TRACKER_INTERVAL_SECONDS, 60.0, 24 * 60 * 60.0)
port_interval = _setting_float("PYTORRENT_PORT_CHECK_INTERVAL_SECONDS", DEFAULT_PORT_INTERVAL_SECONDS, 60.0, 24 * 60 * 60.0)
def runner() -> None:
time.sleep(STARTUP_DELAY_SECONDS)
last_tracker: dict[int, float] = {}
last_port: dict[int, float] = {}
while True:
now = time.monotonic()
next_sleep = 60.0
for profile in _profiles():
profile_id = int(profile.get("id") or 0)
if not profile_id:
continue
if now - float(last_tracker.get(profile_id) or 0.0) >= tracker_interval:
last_tracker[profile_id] = now
try:
_warm_tracker_profile(profile)
except Exception as exc:
_remember("tracker_warmup", profile_id, {"ok": False, "error": str(exc)})
if now - float(last_port.get(profile_id) or 0.0) >= port_interval:
last_port[profile_id] = now
try:
_check_port_profile(profile)
except Exception as exc:
_remember("port_check", profile_id, {"ok": False, "error": str(exc)})
next_sleep = min(
next_sleep,
max(1.0, tracker_interval - (time.monotonic() - float(last_tracker.get(profile_id) or 0.0))),
max(1.0, port_interval - (time.monotonic() - float(last_port.get(profile_id) or 0.0))),
)
sleep_for = max(5.0, min(60.0, next_sleep))
if socketio:
socketio.sleep(sleep_for)
else:
time.sleep(sleep_for)
if socketio:
socketio.start_background_task(runner)
else:
threading.Thread(target=runner, daemon=True, name="pytorrent-cache-warmup-scheduler").start()
+41 -1
View File
@@ -8,7 +8,10 @@ from typing import Any
import psutil import psutil
from ..db import connect, default_user_id, utcnow from ..db import connect, default_user_id, utcnow
from . import auth, rtorrent from . import auth, operation_logs, rtorrent
PLANNER_STARTUP_DELAY_SECONDS = 60
_APP_STARTED_AT = time.monotonic()
DEFAULTS = { DEFAULTS = {
"enabled": False, "enabled": False,
@@ -45,6 +48,34 @@ DEFAULTS = {
_LAST_RUN: dict[int, float] = {} _LAST_RUN: dict[int, float] = {}
_LAST_LIMITS: dict[int, tuple[int, int]] = {} _LAST_LIMITS: dict[int, tuple[int, int]] = {}
_HIGH_CPU_SINCE: dict[int, float] = {} _HIGH_CPU_SINCE: dict[int, float] = {}
_PLANNER_CONNECTION_STATUS: dict[int, str] = {}
def _rtorrent_ready(profile: dict) -> tuple[bool, str]:
"""Check rTorrent connectivity before the planner evaluates or applies changes."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _log_connection_status(profile: dict, status: str, message: str, *, error: str = "", user_id: int | None = None) -> None:
"""Record planner connectivity state changes as system operations without noisy repeats."""
profile_id = int(profile.get("id") or 0)
if _PLANNER_CONNECTION_STATUS.get(profile_id) == status:
return
_PLANNER_CONNECTION_STATUS[profile_id] = status
operation_logs.record(
profile_id,
"download_planner_status",
message,
severity="warning" if error else "info",
source="system",
action="download_planner",
details={"status": status, "error": error},
user_id=user_id or int(profile.get("user_id") or 0) or None,
)
def _bool(value: Any) -> bool: def _bool(value: Any) -> bool:
@@ -471,11 +502,20 @@ def enforce(profile: dict, force: bool = False, user_id: int | None = None) -> d
return {"ok": True, "enabled": False, "profile_id": profile_id, "skipped": True, "reason": "planner owner has no write access", "history": history(profile_id, 20), "history_total": history_count(profile_id)} return {"ok": True, "enabled": False, "profile_id": profile_id, "skipped": True, "reason": "planner owner has no write access", "history": history(profile_id, 20), "history_total": history_count(profile_id)}
if not settings.get("enabled"): if not settings.get("enabled"):
return {"ok": True, "enabled": False, "profile_id": profile_id, "history": history(profile_id, 20), "history_total": history_count(profile_id), "preview": preview(profile, user_id=user_id)} return {"ok": True, "enabled": False, "profile_id": profile_id, "history": history(profile_id, 20), "history_total": history_count(profile_id), "preview": preview(profile, user_id=user_id)}
startup_remaining = int(PLANNER_STARTUP_DELAY_SECONDS - (time.monotonic() - _APP_STARTED_AT))
if not force and startup_remaining > 0:
# Note: The background planner keeps the same startup grace as rTorrent config apply, while manual checks still run immediately.
return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True, "reason": "startup_delay", "retry_after_seconds": startup_remaining}
now = time.monotonic() now = time.monotonic()
interval = int(settings.get("check_interval_seconds") or 30) interval = int(settings.get("check_interval_seconds") or 30)
if not force and now - _LAST_RUN.get(profile_id, 0) < interval: if not force and now - _LAST_RUN.get(profile_id, 0) < interval:
return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True} return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True}
_LAST_RUN[profile_id] = now _LAST_RUN[profile_id] = now
ready, connection_error = _rtorrent_ready(profile)
if not ready:
_log_connection_status(profile, "waiting", f"Download Planner is waiting for rTorrent: {connection_error}", error=connection_error, user_id=user_id)
return {"ok": True, "enabled": True, "profile_id": profile_id, "skipped": True, "reason": "rtorrent_unavailable", "error": connection_error, "retry_after_seconds": interval}
_log_connection_status(profile, "connected", "Download Planner detected a working rTorrent connection", user_id=user_id)
decision = evaluate(profile, settings) decision = evaluate(profile, settings)
result: dict[str, Any] = {"ok": True, "enabled": True, **decision, "limits_changed": False, "paused": 0, "resumed": 0} result: dict[str, Any] = {"ok": True, "enabled": True, **decision, "limits_changed": False, "paused": 0, "resumed": 0}
wanted_limits = (int(decision["down"]), int(decision["up"])) wanted_limits = (int(decision["down"]), int(decision["up"]))
+195
View File
@@ -0,0 +1,195 @@
from __future__ import annotations
import json
import re
import socket
import time
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from typing import Any
from ..db import connect
from . import preferences, rtorrent
PORT_CHECK_CACHE_SECONDS = 6 * 60 * 60
MAX_PORT_CHECK_CANDIDATES = 256
def _app_setting_get(key: str) -> str | None:
with connect() as conn:
row = conn.execute("SELECT value FROM app_settings WHERE key=?", (key,)).fetchone()
return row.get("value") if row else None
def _app_setting_set(key: str, value: str) -> None:
with connect() as conn:
conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (key, value))
def _iso_from_epoch(value: Any) -> str | None:
try:
return datetime.fromtimestamp(float(value), timezone.utc).isoformat(timespec="seconds")
except Exception:
return None
def _public_ip(profile: dict | None = None, force: bool = False) -> str:
if profile and bool(profile.get("is_remote")):
return rtorrent.remote_public_ip(profile, force=force)
req = urllib.request.Request("https://api.ipify.org", headers={"User-Agent": "pyTorrent/port-check"})
with urllib.request.urlopen(req, timeout=8) as res:
return res.read(64).decode("utf-8", "replace").strip()
def _parse_port_candidates(value: str, limit: int = MAX_PORT_CHECK_CANDIDATES) -> tuple[list[int], bool]:
"""Return valid incoming port candidates from rTorrent network.port_range."""
# Note: rTorrent can keep a range/list and pick a random port on start, so the checker tests all safe candidates.
ports: list[int] = []
seen: set[int] = set()
truncated = False
def add(port: int) -> None:
nonlocal truncated
if not 1 <= port <= 65535 or port in seen:
return
if len(ports) >= limit:
truncated = True
return
seen.add(port)
ports.append(port)
for start, end in re.findall(r"(\d{1,5})\s*-\s*(\d{1,5})", value or ""):
a, b = int(start), int(end)
if a > b:
a, b = b, a
for port in range(a, b + 1):
add(port)
if truncated:
break
without_ranges = re.sub(r"\d{1,5}\s*-\s*\d{1,5}", " ", value or "")
for item in re.findall(r"\d{1,5}", without_ranges):
add(int(item))
return ports, truncated
def _incoming_ports(profile: dict) -> dict:
try:
raw_value = str(rtorrent.client_for(profile).call("network.port_range") or "")
except Exception:
raw_value = ""
ports, truncated = _parse_port_candidates(raw_value)
return {"ports": ports, "raw": raw_value, "truncated": truncated}
def _yougetsignal_check(public_ip: str, port: int) -> dict:
body = urllib.parse.urlencode({"remoteAddress": public_ip, "portNumber": str(port)}).encode("utf-8")
req = urllib.request.Request(
"https://ports.yougetsignal.com/check-port.php",
data=body,
headers={
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "pyTorrent/port-check",
"Accept": "text/html,application/json,*/*",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=12) as res:
text = res.read(8192).decode("utf-8", "replace")
low = text.lower()
if "is open" in low:
return {"status": "open", "source": "yougetsignal", "raw": text[:500]}
if "is closed" in low:
return {"status": "closed", "source": "yougetsignal", "raw": text[:500]}
return {"status": "unknown", "source": "yougetsignal", "raw": text[:500]}
def _local_port_fallback(public_ip: str, port: int) -> dict:
try:
with socket.create_connection((public_ip, port), timeout=3):
return {"status": "open", "source": "local-fallback"}
except Exception as exc:
return {"status": "unknown", "source": "local-fallback", "error": f"Local fallback inconclusive: {exc}"}
def _check_ports(public_ip: str, ports: list[int], checker) -> dict:
checked: list[int] = []
first_closed: dict | None = None
last_result: dict = {"status": "unknown"}
for port in ports:
checked.append(port)
current = checker(public_ip, port)
last_result = current
if current.get("status") == "open":
current.update({"port": port, "open_port": port, "checked_ports": checked})
return current
if current.get("status") == "closed" and first_closed is None:
first_closed = current
result = first_closed or last_result
result.update({"port": ports[0] if ports else None, "open_port": None, "checked_ports": checked})
return result
def port_check_status(profile: dict | None = None, force: bool = False, user_id: int | None = None) -> dict:
"""Return cached or freshly checked incoming-port status for one rTorrent profile."""
# Note: This service is shared by UI routes and the background worker, so browser startup is not required.
profile = profile or preferences.active_profile(user_id)
prefs = preferences.get_preferences(user_id, int(profile.get("id"))) if profile else preferences.get_preferences(user_id)
enabled = bool((prefs or {}).get("port_check_enabled"))
if not profile:
return {"status": "unknown", "enabled": enabled, "error": "No profile"}
port_info = _incoming_ports(profile)
ports = port_info["ports"]
if not ports:
return {"status": "unknown", "enabled": enabled, "error": "Cannot read rTorrent network.port_range"}
ports_key = ",".join(str(port) for port in ports)
cache_key = f"port_check:{profile['id']}:{ports_key}:{int(bool(port_info['truncated']))}"
if not force:
cached = _app_setting_get(cache_key)
if cached:
try:
data = json.loads(cached)
if time.time() - float(data.get("checked_at_epoch") or 0) < PORT_CHECK_CACHE_SECONDS:
data["cached"] = True
data["enabled"] = enabled
if not data.get("checked_at"):
data["checked_at"] = _iso_from_epoch(data.get("checked_at_epoch"))
return data
except Exception:
pass
checked_at_epoch = time.time()
result = {
"status": "unknown",
"enabled": enabled,
"port": ports[0],
"ports": ports,
"port_range": port_info["raw"],
"ports_truncated": port_info["truncated"],
"checked_at_epoch": checked_at_epoch,
"checked_at": _iso_from_epoch(checked_at_epoch),
"cached": False,
}
try:
public_ip = _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _yougetsignal_check))
except Exception as exc:
result["error"] = f"YouGetSignal failed: {exc}"
try:
public_ip = result.get("public_ip") or _public_ip(profile, force=force)
result["public_ip"] = public_ip
result["remote"] = bool(profile.get("is_remote"))
result.update(_check_ports(public_ip, ports, _local_port_fallback))
except Exception as fallback_exc:
result["fallback_error"] = str(fallback_exc)
result["source"] = "none"
_app_setting_set(cache_key, json.dumps(result))
return result
+87 -10
View File
@@ -1,26 +1,103 @@
from __future__ import annotations from __future__ import annotations
from time import sleep import threading
from . import preferences, rtorrent from time import monotonic
from ..db import connect
from . import operation_logs, rtorrent
_started = False _started = False
_start_lock = threading.Lock()
_applied_profiles: set[int] = set()
_last_status: dict[int, str] = {}
def schedule_startup_config_apply(socketio, delay_seconds: int = 60) -> None: def _profiles() -> list[dict]:
"""Apply saved rTorrent UI overrides after pyTorrent has been running for a moment.""" """Read all configured profiles because startup work has no browser user session."""
with connect() as conn:
return [dict(row) for row in conn.execute("SELECT * FROM rtorrent_profiles ORDER BY id").fetchall()]
def _log_status(profile: dict, status: str, message: str, *, error: str = "", result: dict | None = None) -> None:
"""Write meaningful startup config state changes as system operations."""
profile_id = int(profile.get("id") or 0)
if status in {"waiting", "skipped"} and _last_status.get(profile_id) == status:
return
_last_status[profile_id] = status
operation_logs.record(
profile_id,
"rtorrent_config_startup",
message,
severity="warning" if error else "info",
source="system",
action="rtorrent_config",
details={"status": status, "error": error, "result": result or {}},
user_id=int(profile.get("user_id") or 0) or None,
)
def _rtorrent_ready(profile: dict) -> tuple[bool, str]:
"""Check rTorrent before applying saved runtime overrides."""
try:
rtorrent.client_for(profile).call("system.client_version")
return True, ""
except Exception as exc:
return False, str(exc)
def _apply_profile(socketio, profile: dict) -> None:
"""Apply saved config only after the target rTorrent is reachable."""
profile_id = int(profile.get("id") or 0)
if not profile_id or profile_id in _applied_profiles:
return
ok, error = _rtorrent_ready(profile)
if not ok:
_log_status(profile, "waiting", f"rTorrent config apply is waiting for connection: {error}", error=error)
return
result = rtorrent.apply_startup_overrides(profile)
if result.get("skipped"):
_applied_profiles.add(profile_id)
_log_status(profile, "skipped", "No saved rTorrent startup config overrides to apply", result=result)
return
_applied_profiles.add(profile_id)
_log_status(profile, "applied", "Saved rTorrent startup config overrides applied", result=result)
socketio.emit("rtorrent_config_applied", {"profile_id": profile_id, "result": result})
def schedule_startup_config_apply(socketio, delay_seconds: int = 60, retry_seconds: int = 30, max_wait_seconds: int = 3600) -> None:
"""Apply saved rTorrent UI overrides after the configured startup delay without requiring a browser."""
global _started global _started
with _start_lock:
if _started: if _started:
return return
_started = True _started = True
def runner(): def runner() -> None:
sleep(max(0, int(delay_seconds))) socketio.sleep(max(0, int(delay_seconds)))
started_at = monotonic()
while True:
try: try:
for profile in preferences.list_profiles(): profiles = _profiles()
result = rtorrent.apply_startup_overrides(profile) for profile in profiles:
if not result.get("skipped"): _apply_profile(socketio, profile)
socketio.emit("rtorrent_config_applied", {"profile_id": profile["id"], "result": result}) pending = [int(profile.get("id") or 0) for profile in profiles if int(profile.get("id") or 0) not in _applied_profiles]
if not pending or monotonic() - started_at >= max(0, int(max_wait_seconds)):
for profile in profiles:
profile_id = int(profile.get("id") or 0)
if profile_id in pending:
_log_status(profile, "timeout", "rTorrent config startup apply stopped waiting for connection", error="startup wait timeout")
return
except Exception as exc: except Exception as exc:
operation_logs.record(
None,
"rtorrent_config_startup",
f"rTorrent startup config scheduler failed: {exc}",
severity="warning",
source="system",
action="rtorrent_config",
details={"error": str(exc)},
)
socketio.emit("rtorrent_config_applied", {"ok": False, "error": str(exc)}) socketio.emit("rtorrent_config_applied", {"ok": False, "error": str(exc)})
socketio.sleep(max(5, int(retry_seconds)))
socketio.start_background_task(runner) socketio.start_background_task(runner)
+47
View File
@@ -438,3 +438,50 @@ def favicon_path(domain: str, enabled: bool = True, force: bool = False) -> tupl
(clean, utcnow(), now, "; ".join(errors[-8:]) or "favicon not found"), (clean, utcnow(), now, "; ".join(errors[-8:]) or "favicon not found"),
) )
return None, None return None, None
def cached_domains_for_profile(profile_id: int, limit: int = 200) -> list[str]:
"""Return tracker domains already known for a profile from the summary cache."""
# Note: The background favicon worker reads cached summary rows first, so it does not need the browser sidebar to discover domains.
domains: list[str] = []
seen: set[str] = set()
with connect() as conn:
rows = conn.execute(
"SELECT trackers_json FROM tracker_summary_cache WHERE profile_id=? ORDER BY updated_epoch DESC LIMIT ?",
(int(profile_id), max(1, int(limit or 200))),
).fetchall()
for row in rows:
try:
items = json.loads(row.get("trackers_json") or "[]")
except Exception:
items = []
for item in items if isinstance(items, list) else []:
domain = tracker_domain(str((item or {}).get("url") or (item or {}).get("domain") or "")) or str((item or {}).get("domain") or "")
if domain and domain not in seen:
seen.add(domain)
domains.append(domain)
return domains[:max(1, int(limit or 200))]
def warm_favicon_cache(domains: list[str], enabled: bool = True, limit: int = 20, force: bool = False) -> dict:
"""Warm missing or stale tracker favicons for a bounded list of domains."""
# Note: Favicon lookup can perform network requests, so the caller must keep the batch size small.
clean_domains = []
seen: set[str] = set()
for domain in domains or []:
clean = tracker_domain(domain)
if clean and clean not in seen:
seen.add(clean)
clean_domains.append(clean)
checked = 0
cached = 0
errors: list[dict] = []
for domain in clean_domains[:max(0, int(limit or 0))]:
checked += 1
try:
path, _mime = favicon_path(domain, enabled=enabled, force=force)
if path:
cached += 1
except Exception as exc:
errors.append({"domain": domain, "error": str(exc)})
return {"checked": checked, "cached": cached, "errors": errors[:10]}
+69 -4
View File
@@ -1,9 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from __future__ import annotations from __future__ import annotations
import os
import re import re
import time
from pathlib import Path from pathlib import Path
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
ROOT = Path(__file__).resolve().parents[1] ROOT = Path(__file__).resolve().parents[1]
@@ -31,6 +34,40 @@ GOOGLE_FONT_FAMILIES = (
"Source Sans 3", "Source Sans 3",
) )
GOOGLE_FONT_WEIGHTS = "400;500;600;700;800" GOOGLE_FONT_WEIGHTS = "400;500;600;700;800"
DOWNLOAD_RETRIES = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRIES", "4"))
DOWNLOAD_RETRY_DELAY = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRY_DELAY", "10"))
DOWNLOAD_TIMEOUT = int(os.environ.get("PYTORRENT_DOWNLOAD_TIMEOUT", "180"))
def retry_countdown(seconds: int) -> None:
for remaining in range(seconds, 0, -1):
print(f"Retrying in {remaining}s...", end="\r", flush=True)
time.sleep(1)
if seconds > 0:
print(" " * 40, end="\r", flush=True)
def candidate_urls(url: str) -> list[str]:
candidates = [url]
replacements = (
("https://cdn.jsdelivr.net/npm/bootstrap@", "https://unpkg.com/bootstrap@"),
("https://cdn.jsdelivr.net/npm/bootswatch@", "https://unpkg.com/bootswatch@"),
("https://cdn.jsdelivr.net/npm/swagger-ui-dist@", "https://unpkg.com/swagger-ui-dist@"),
("https://cdn.jsdelivr.net/gh/lipis/flag-icons@", "https://cdn.jsdelivr.net/npm/flag-icons@"),
("https://cdn.jsdelivr.net/gh/DevExpress/bootstrap-themes@master/", "https://raw.githubusercontent.com/DevExpress/bootstrap-themes/master/"),
("https://cdn.socket.io/", "https://cdnjs.cloudflare.com/ajax/libs/socket.io/"),
("https://cdnjs.cloudflare.com/ajax/libs/font-awesome/", "https://cdn.jsdelivr.net/npm/@fortawesome/fontawesome-free@"),
)
for old, new in replacements:
if url.startswith(old):
candidates.append(url.replace(old, new, 1))
# font-awesome has a different path layout on npm/jsDelivr.
candidates = [item.replace("/css/all.min.css", "/css/all.min.css") for item in candidates]
unique = []
for item in candidates:
if item not in unique:
unique.append(item)
return unique
def google_fonts_css_url() -> str: def google_fonts_css_url() -> str:
@@ -147,15 +184,31 @@ def bootstrap_css_asset(theme: str) -> dict[str, str]:
def download(url: str, dest: Path) -> None: def download(url: str, dest: Path) -> None:
dest.parent.mkdir(parents=True, exist_ok=True) dest.parent.mkdir(parents=True, exist_ok=True)
req = Request(url, headers={"User-Agent": "pyTorrent installer"}) last_error: Exception | None = None
with urlopen(req, timeout=60) as response: for candidate in candidate_urls(url):
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
req = Request(candidate, headers={"User-Agent": "pyTorrent installer"})
with urlopen(req, timeout=DOWNLOAD_TIMEOUT) as response:
data = response.read() data = response.read()
if not data: if not data:
raise RuntimeError(f"Empty response for {url}") raise RuntimeError(f"Empty response for {candidate}")
tmp = dest.with_suffix(dest.suffix + ".tmp") tmp = dest.with_suffix(dest.suffix + ".tmp")
tmp.write_bytes(data) tmp.write_bytes(data)
tmp.replace(dest) tmp.replace(dest)
if candidate != url:
print(f"OK {dest.relative_to(ROOT)} from fallback {candidate}")
else:
print(f"OK {dest.relative_to(ROOT)}") print(f"OK {dest.relative_to(ROOT)}")
return
except (HTTPError, URLError, TimeoutError, OSError, RuntimeError) as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) for {candidate}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
if candidate != candidate_urls(url)[-1]:
print(f"Trying alternative source: {candidate_urls(url)[candidate_urls(url).index(candidate) + 1]}")
raise RuntimeError(f"Failed to download {url}: {last_error}")
def download_css_with_assets(url: str, dest: Path) -> None: def download_css_with_assets(url: str, dest: Path) -> None:
@@ -184,10 +237,22 @@ def download_google_fonts_css(url: str, dest: Path) -> None:
"Accept": "text/css,*/*;q=0.1", "Accept": "text/css,*/*;q=0.1",
}, },
) )
with urlopen(req, timeout=60) as response: last_error: Exception | None = None
css = ""
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
with urlopen(req, timeout=DOWNLOAD_TIMEOUT) as response:
css = response.read().decode("utf-8", errors="ignore") css = response.read().decode("utf-8", errors="ignore")
if not css.strip(): if not css.strip():
raise RuntimeError(f"Empty response for {url}") raise RuntimeError(f"Empty response for {url}")
break
except (HTTPError, URLError, TimeoutError, OSError, RuntimeError) as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) for {url}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
if not css.strip():
raise RuntimeError(f"Failed to download {url}: {last_error}")
def replace_url(match: re.Match[str]) -> str: def replace_url(match: re.Match[str]) -> str:
quote = match.group(1) or "" quote = match.group(1) or ""
+59 -3
View File
@@ -24,6 +24,10 @@ REPO_URL="${PYTORRENT_REPO_URL:-https://github.com/zdzichu6969/pyTorrent}"
REPO_BRANCH="${PYTORRENT_REPO_BRANCH:-master}" REPO_BRANCH="${PYTORRENT_REPO_BRANCH:-master}"
WORK_DIR="${PYTORRENT_BOOTSTRAP_DIR:-/tmp/pytorrent-only-installer}" WORK_DIR="${PYTORRENT_BOOTSTRAP_DIR:-/tmp/pytorrent-only-installer}"
KEEP_WORK_DIR="${PYTORRENT_KEEP_BOOTSTRAP_DIR:-0}" KEEP_WORK_DIR="${PYTORRENT_KEEP_BOOTSTRAP_DIR:-0}"
DOWNLOAD_RETRIES="${PYTORRENT_DOWNLOAD_RETRIES:-4}"
DOWNLOAD_RETRY_DELAY="${PYTORRENT_DOWNLOAD_RETRY_DELAY:-10}"
DOWNLOAD_CONNECT_TIMEOUT="${PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT:-30}"
DOWNLOAD_MAX_TIME="${PYTORRENT_DOWNLOAD_MAX_TIME:-600}"
default_archive_url() { default_archive_url() {
case "${REPO_URL%/}" in case "${REPO_URL%/}" in
@@ -61,13 +65,65 @@ prepare_downloader() {
fail "curl or wget is required." fail "curl or wget is required."
} }
retry_countdown() {
local seconds="$1"
local remaining
for ((remaining=seconds; remaining>0; remaining--)); do
printf 'Retrying in %ss...\r' "${remaining}"
sleep 1
done
[[ "${seconds}" -gt 0 ]] && printf '%*s\r' 40 ''
}
archive_url_candidates() {
local url="$1"
printf '%s\n' "${url}"
case "${url}" in
https://github.com/*/archive/refs/heads/*.tar.gz)
local rest owner repo branch
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
branch="${url##*/}"
branch="${branch%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/refs/heads/%s\n' "${owner}" "${repo}" "${branch}"
;;
https://github.com/*/archive/*.tar.gz)
local rest owner repo ref
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
ref="${url##*/}"
ref="${ref%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/%s\n' "${owner}" "${repo}" "${ref}"
;;
esac
}
download_file() { download_file() {
local url="$1" destination="$2" local url="$1"
local destination="$2"
local candidate attempt status
while IFS= read -r candidate; do
[[ -n "${candidate}" ]] || continue
for ((attempt=1; attempt<=DOWNLOAD_RETRIES; attempt++)); do
if [[ "${DOWNLOADER}" == "curl" ]]; then if [[ "${DOWNLOADER}" == "curl" ]]; then
curl -fL "${url}" -o "${destination}" curl -fL --connect-timeout "${DOWNLOAD_CONNECT_TIMEOUT}" --max-time "${DOWNLOAD_MAX_TIME}" "${candidate}" -o "${destination}" && return 0
status=$?
else else
wget -O "${destination}" "${url}" wget --timeout="${DOWNLOAD_CONNECT_TIMEOUT}" --read-timeout="${DOWNLOAD_MAX_TIME}" --tries=1 -O "${destination}" "${candidate}" && return 0
status=$?
fi fi
log "Download failed (${attempt}/${DOWNLOAD_RETRIES}) from ${candidate} (exit ${status})."
if [[ "${attempt}" -lt "${DOWNLOAD_RETRIES}" ]]; then
retry_countdown "${DOWNLOAD_RETRY_DELAY}"
fi
done
log "Trying alternative source if available after: ${candidate}"
done < <(archive_url_candidates "${url}")
return 1
} }
cleanup() { cleanup() {
+57 -2
View File
@@ -17,6 +17,10 @@ REPO_URL="${PYTORRENT_REPO_URL:-https://github.com/zdzichu6969/pyTorrent}"
REPO_BRANCH="${PYTORRENT_REPO_BRANCH:-master}" REPO_BRANCH="${PYTORRENT_REPO_BRANCH:-master}"
WORK_DIR="${PYTORRENT_BOOTSTRAP_DIR:-/tmp/pytorrent-stack-installer}" WORK_DIR="${PYTORRENT_BOOTSTRAP_DIR:-/tmp/pytorrent-stack-installer}"
KEEP_WORK_DIR="${PYTORRENT_KEEP_BOOTSTRAP_DIR:-0}" KEEP_WORK_DIR="${PYTORRENT_KEEP_BOOTSTRAP_DIR:-0}"
DOWNLOAD_RETRIES="${PYTORRENT_DOWNLOAD_RETRIES:-4}"
DOWNLOAD_RETRY_DELAY="${PYTORRENT_DOWNLOAD_RETRY_DELAY:-10}"
DOWNLOAD_CONNECT_TIMEOUT="${PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT:-30}"
DOWNLOAD_MAX_TIME="${PYTORRENT_DOWNLOAD_MAX_TIME:-600}"
default_archive_url() { default_archive_url() {
case "${REPO_URL%/}" in case "${REPO_URL%/}" in
@@ -105,14 +109,65 @@ prepare_downloader() {
fail "curl or wget is required and no supported package manager was found." fail "curl or wget is required and no supported package manager was found."
} }
retry_countdown() {
local seconds="$1"
local remaining
for ((remaining=seconds; remaining>0; remaining--)); do
printf 'Retrying in %ss...\r' "${remaining}"
sleep 1
done
[[ "${seconds}" -gt 0 ]] && printf '%*s\r' 40 ''
}
archive_url_candidates() {
local url="$1"
printf '%s\n' "${url}"
case "${url}" in
https://github.com/*/archive/refs/heads/*.tar.gz)
local rest owner repo branch
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
branch="${url##*/}"
branch="${branch%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/refs/heads/%s\n' "${owner}" "${repo}" "${branch}"
;;
https://github.com/*/archive/*.tar.gz)
local rest owner repo ref
rest="${url#https://github.com/}"
owner="${rest%%/*}"
rest="${rest#*/}"
repo="${rest%%/*}"
ref="${url##*/}"
ref="${ref%.tar.gz}"
printf 'https://codeload.github.com/%s/%s/tar.gz/%s\n' "${owner}" "${repo}" "${ref}"
;;
esac
}
download_file() { download_file() {
local url="$1" local url="$1"
local destination="$2" local destination="$2"
local candidate attempt status
while IFS= read -r candidate; do
[[ -n "${candidate}" ]] || continue
for ((attempt=1; attempt<=DOWNLOAD_RETRIES; attempt++)); do
if [[ "${DOWNLOADER}" == "curl" ]]; then if [[ "${DOWNLOADER}" == "curl" ]]; then
curl -fL "${url}" -o "${destination}" curl -fL --connect-timeout "${DOWNLOAD_CONNECT_TIMEOUT}" --max-time "${DOWNLOAD_MAX_TIME}" "${candidate}" -o "${destination}" && return 0
status=$?
else else
wget -O "${destination}" "${url}" wget --timeout="${DOWNLOAD_CONNECT_TIMEOUT}" --read-timeout="${DOWNLOAD_MAX_TIME}" --tries=1 -O "${destination}" "${candidate}" && return 0
status=$?
fi fi
log "Download failed (${attempt}/${DOWNLOAD_RETRIES}) from ${candidate} (exit ${status})."
if [[ "${attempt}" -lt "${DOWNLOAD_RETRIES}" ]]; then
retry_countdown "${DOWNLOAD_RETRY_DELAY}"
fi
done
log "Trying alternative source if available after: ${candidate}"
done < <(archive_url_candidates "${url}")
return 1
} }
detect_os_family() { detect_os_family() {
+70 -4
View File
@@ -24,6 +24,54 @@ DEFAULT_CURL_REF = "8.19.0"
DEFAULT_SERVICE_PATH = "/etc/systemd/system/rtorrent@.service" DEFAULT_SERVICE_PATH = "/etc/systemd/system/rtorrent@.service"
DEFAULT_SCGI_PORT = 5000 DEFAULT_SCGI_PORT = 5000
DEFAULT_TORRENT_PORT = 51300 DEFAULT_TORRENT_PORT = 51300
DOWNLOAD_RETRIES = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRIES", "4"))
DOWNLOAD_RETRY_DELAY = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRY_DELAY", "10"))
DOWNLOAD_CONNECT_TIMEOUT = int(os.environ.get("PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT", "30"))
DOWNLOAD_MAX_TIME = int(os.environ.get("PYTORRENT_DOWNLOAD_MAX_TIME", "600"))
def retry_countdown(seconds):
for remaining in range(seconds, 0, -1):
print(f"Retrying in {remaining}s...", end="\r", flush=True)
time.sleep(1)
if seconds > 0:
print(" " * 40, end="\r", flush=True)
def run_with_retry(cmd, *, retries=DOWNLOAD_RETRIES, retry_delay=DOWNLOAD_RETRY_DELAY, retry_label=None, **kwargs):
last_error = None
label = retry_label or " ".join(str(x) for x in cmd[:3])
for attempt in range(1, retries + 1):
try:
return run(cmd, **kwargs)
except InstallError as exc:
last_error = exc
print(f"{label} failed ({attempt}/{retries}): {exc}")
if attempt < retries:
retry_countdown(retry_delay)
raise last_error
def download_url_candidates(url):
candidates = [url]
if url.startswith("https://github.com/c-ares/c-ares/releases/download/v") and url.endswith(".tar.gz"):
version = url.rsplit("/c-ares-", 1)[-1].removesuffix(".tar.gz")
candidates.append(f"https://codeload.github.com/c-ares/c-ares/tar.gz/refs/tags/v{version}")
if url.startswith("https://curl.se/download/curl-") and url.endswith(".tar.gz"):
version = url.rsplit("/curl-", 1)[-1].removesuffix(".tar.gz")
tag = "curl-" + version.replace(".", "_")
candidates.append(f"https://github.com/curl/curl/releases/download/{tag}/curl-{version}.tar.gz")
candidates.append(f"https://codeload.github.com/curl/curl/tar.gz/refs/tags/{tag}")
if "sourceforge.net/projects/xmlrpc-c/files/latest/download" in url:
candidates.append("https://downloads.sourceforge.net/project/xmlrpc-c/latest/download")
if url.startswith("https://downloads.sourceforge.net/project/xmlrpc-c/"):
candidates.append(url.replace("https://downloads.sourceforge.net/", "https://sourceforge.net/projects/").replace("project/xmlrpc-c/", "xmlrpc-c/files/"))
unique = []
for candidate in candidates:
if candidate not in unique:
unique.append(candidate)
return unique
class InstallError(Exception): class InstallError(Exception):
@@ -220,17 +268,35 @@ def clone_or_update_repo(repo_url, repo_dir, ref, *, debug=False):
repo_dir = Path(repo_dir) repo_dir = Path(repo_dir)
if not repo_dir.exists(): if not repo_dir.exists():
with Spinner(f"Cloning {repo_dir.name}", enabled=not debug): with Spinner(f"Cloning {repo_dir.name}", enabled=not debug):
run(["git", "clone", repo_url, str(repo_dir)], debug=debug) run_with_retry(["git", "clone", repo_url, str(repo_dir)], debug=debug, retry_label=f"git clone {repo_url}")
else: else:
print(f"Repository already exists: {repo_dir}") print(f"Repository already exists: {repo_dir}")
with Spinner(f"Checking out {repo_dir.name} -> {ref}", enabled=not debug): with Spinner(f"Checking out {repo_dir.name} -> {ref}", enabled=not debug):
run(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug) run_with_retry(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug, retry_label=f"git fetch {repo_dir.name}")
run(["git", "checkout", ref], cwd=str(repo_dir), debug=debug) run(["git", "checkout", ref], cwd=str(repo_dir), debug=debug)
run(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug) run_with_retry(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug, retry_label=f"git pull {repo_dir.name}")
def download_file(url, destination, *, debug=False): def download_file(url, destination, *, debug=False):
run(["curl", "-fL", url, "-o", str(destination)], debug=debug) last_error = None
for candidate in download_url_candidates(url):
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
return run([
"curl",
"-fL",
"--connect-timeout", str(DOWNLOAD_CONNECT_TIMEOUT),
"--max-time", str(DOWNLOAD_MAX_TIME),
candidate,
"-o", str(destination),
], debug=debug)
except InstallError as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) from {candidate}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
print(f"Trying alternative source if available after: {candidate}")
raise last_error or InstallError(f"Download failed: {url}")
def extract_tarball(tarball, destination, *, debug=False): def extract_tarball(tarball, destination, *, debug=False):
@@ -24,6 +24,54 @@ DEFAULT_CURL_REF = "8.19.0"
DEFAULT_SERVICE_PATH = "/etc/systemd/system/rtorrent@.service" DEFAULT_SERVICE_PATH = "/etc/systemd/system/rtorrent@.service"
DEFAULT_SCGI_PORT = 5000 DEFAULT_SCGI_PORT = 5000
DEFAULT_TORRENT_PORT = 51300 DEFAULT_TORRENT_PORT = 51300
DOWNLOAD_RETRIES = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRIES", "4"))
DOWNLOAD_RETRY_DELAY = int(os.environ.get("PYTORRENT_DOWNLOAD_RETRY_DELAY", "10"))
DOWNLOAD_CONNECT_TIMEOUT = int(os.environ.get("PYTORRENT_DOWNLOAD_CONNECT_TIMEOUT", "30"))
DOWNLOAD_MAX_TIME = int(os.environ.get("PYTORRENT_DOWNLOAD_MAX_TIME", "600"))
def retry_countdown(seconds):
for remaining in range(seconds, 0, -1):
print(f"Retrying in {remaining}s...", end="\r", flush=True)
time.sleep(1)
if seconds > 0:
print(" " * 40, end="\r", flush=True)
def run_with_retry(cmd, *, retries=DOWNLOAD_RETRIES, retry_delay=DOWNLOAD_RETRY_DELAY, retry_label=None, **kwargs):
last_error = None
label = retry_label or " ".join(str(x) for x in cmd[:3])
for attempt in range(1, retries + 1):
try:
return run(cmd, **kwargs)
except InstallError as exc:
last_error = exc
print(f"{label} failed ({attempt}/{retries}): {exc}")
if attempt < retries:
retry_countdown(retry_delay)
raise last_error
def download_url_candidates(url):
candidates = [url]
if url.startswith("https://github.com/c-ares/c-ares/releases/download/v") and url.endswith(".tar.gz"):
version = url.rsplit("/c-ares-", 1)[-1].removesuffix(".tar.gz")
candidates.append(f"https://codeload.github.com/c-ares/c-ares/tar.gz/refs/tags/v{version}")
if url.startswith("https://curl.se/download/curl-") and url.endswith(".tar.gz"):
version = url.rsplit("/curl-", 1)[-1].removesuffix(".tar.gz")
tag = "curl-" + version.replace(".", "_")
candidates.append(f"https://github.com/curl/curl/releases/download/{tag}/curl-{version}.tar.gz")
candidates.append(f"https://codeload.github.com/curl/curl/tar.gz/refs/tags/{tag}")
if "sourceforge.net/projects/xmlrpc-c/files/latest/download" in url:
candidates.append("https://downloads.sourceforge.net/project/xmlrpc-c/latest/download")
if url.startswith("https://downloads.sourceforge.net/project/xmlrpc-c/"):
candidates.append(url.replace("https://downloads.sourceforge.net/", "https://sourceforge.net/projects/").replace("project/xmlrpc-c/", "xmlrpc-c/files/"))
unique = []
for candidate in candidates:
if candidate not in unique:
unique.append(candidate)
return unique
class InstallError(Exception): class InstallError(Exception):
@@ -221,17 +269,35 @@ def clone_or_update_repo(repo_url, repo_dir, ref, *, debug=False):
repo_dir = Path(repo_dir) repo_dir = Path(repo_dir)
if not repo_dir.exists(): if not repo_dir.exists():
with Spinner(f"Cloning {repo_dir.name}", enabled=not debug): with Spinner(f"Cloning {repo_dir.name}", enabled=not debug):
run(["git", "clone", repo_url, str(repo_dir)], debug=debug) run_with_retry(["git", "clone", repo_url, str(repo_dir)], debug=debug, retry_label=f"git clone {repo_url}")
else: else:
print(f"Repository already exists: {repo_dir}") print(f"Repository already exists: {repo_dir}")
with Spinner(f"Checking out {repo_dir.name} -> {ref}", enabled=not debug): with Spinner(f"Checking out {repo_dir.name} -> {ref}", enabled=not debug):
run(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug) run_with_retry(["git", "fetch", "--all", "--tags"], cwd=str(repo_dir), debug=debug, retry_label=f"git fetch {repo_dir.name}")
run(["git", "checkout", ref], cwd=str(repo_dir), debug=debug) run(["git", "checkout", ref], cwd=str(repo_dir), debug=debug)
run(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug) run_with_retry(["git", "pull", "--ff-only"], cwd=str(repo_dir), check=False, debug=debug, retry_label=f"git pull {repo_dir.name}")
def download_file(url, destination, *, debug=False): def download_file(url, destination, *, debug=False):
run(["curl", "-fL", url, "-o", str(destination)], debug=debug) last_error = None
for candidate in download_url_candidates(url):
for attempt in range(1, DOWNLOAD_RETRIES + 1):
try:
return run([
"curl",
"-fL",
"--connect-timeout", str(DOWNLOAD_CONNECT_TIMEOUT),
"--max-time", str(DOWNLOAD_MAX_TIME),
candidate,
"-o", str(destination),
], debug=debug)
except InstallError as exc:
last_error = exc
print(f"Download failed ({attempt}/{DOWNLOAD_RETRIES}) from {candidate}: {exc}")
if attempt < DOWNLOAD_RETRIES:
retry_countdown(DOWNLOAD_RETRY_DELAY)
print(f"Trying alternative source if available after: {candidate}")
raise last_error or InstallError(f"Download failed: {url}")
def extract_tarball(tarball, destination, *, debug=False): def extract_tarball(tarball, destination, *, debug=False):